-
Notifications
You must be signed in to change notification settings - Fork 198
feat: introduce StreamableHttpServerTransport.Configuration #560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+128
−51
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,8 @@ import kotlinx.serialization.json.JsonObject | |
| import kotlinx.serialization.json.decodeFromJsonElement | ||
| import kotlin.concurrent.atomics.AtomicBoolean | ||
| import kotlin.concurrent.atomics.ExperimentalAtomicApi | ||
| import kotlin.time.Duration | ||
| import kotlin.time.Duration.Companion.milliseconds | ||
| import kotlin.uuid.ExperimentalUuidApi | ||
| import kotlin.uuid.Uuid | ||
|
|
||
|
|
@@ -46,8 +48,8 @@ private const val MAXIMUM_MESSAGE_SIZE = 4 * 1024 * 1024 // 4 MB | |
|
|
||
| /** | ||
| * A holder for an active request call. | ||
| * If enableJsonResponse is true, session is null. | ||
| * Otherwise, session is not null. | ||
| * If [StreamableHttpServerTransport.Configuration.enableJsonResponse] is true, the session is null. | ||
| * Otherwise, the session is not null. | ||
| */ | ||
| private data class SessionContext(val session: ServerSSESession?, val call: ApplicationCall) | ||
|
|
||
|
|
@@ -66,32 +68,87 @@ private data class SessionContext(val session: ServerSSESession?, val call: Appl | |
| * - No Session ID is included in any responses | ||
| * - No session validation is performed | ||
| * | ||
| * @param enableJsonResponse If true, the server will return JSON responses instead of starting an SSE stream. | ||
| * This can be useful for simple request/response scenarios without streaming. | ||
| * Default is false (SSE streams are preferred). | ||
| * @param enableDnsRebindingProtection Enable DNS rebinding protection | ||
| * (requires allowedHosts and/or allowedOrigins to be configured). | ||
| * Default is false for backwards compatibility. | ||
| * @param allowedHosts List of allowed host header values for DNS rebinding protection. | ||
| * If not specified, host validation is disabled. | ||
| * @param allowedOrigins List of allowed origin header values for DNS rebinding protection. | ||
| * If not specified, origin validation is disabled. | ||
| * @param eventStore Event store for resumability support | ||
| * If provided, resumability will be enabled, allowing clients to reconnect and resume messages | ||
| * @param retryIntervalMillis Retry interval (in milliseconds) advertised via SSE priming events | ||
| * to hint the client when to reconnect. Applies only when an [eventStore] is configured. | ||
| * Defaults to `null` (no retry hint). | ||
| * @param configuration Transport configuration. See [Configuration] for available options. | ||
| */ | ||
| @OptIn(ExperimentalUuidApi::class, ExperimentalAtomicApi::class) | ||
| @Suppress("TooManyFunctions") | ||
| public class StreamableHttpServerTransport( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a method for backward source compatibility?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| private val enableJsonResponse: Boolean = false, | ||
| private val enableDnsRebindingProtection: Boolean = false, | ||
| private val allowedHosts: List<String>? = null, | ||
| private val allowedOrigins: List<String>? = null, | ||
| private val eventStore: EventStore? = null, | ||
| private val retryIntervalMillis: Long? = null, | ||
| ) : AbstractTransport() { | ||
| public class StreamableHttpServerTransport(private val configuration: Configuration) : AbstractTransport() { | ||
|
|
||
| @Deprecated("Use default constructor with explicit Configuration()") | ||
| public constructor() : this(configuration = Configuration()) | ||
|
|
||
| /** | ||
| * Secondary constructor for `StreamableHttpServerTransport` that simplifies initialization by directly taking the | ||
| * configurable parameters without requiring a `Configuration` instance. | ||
| * | ||
| * @param enableJsonResponse Determines whether the server should return JSON responses. | ||
| * Defaults to `false`. | ||
| * @param enableDnsRebindingProtection Enables DNS rebinding protection. | ||
| * Defaults to `false`. | ||
| * @param allowedHosts A list of hosts allowed for server communication. | ||
| * Defaults to `null`, allowing all hosts. | ||
| * @param allowedOrigins A list of allowed origins for CORS (Cross-Origin Resource Sharing). | ||
| * Defaults to `null`, allowing all origins. | ||
| * @param eventStore The `EventStore` instance for handling resumable events. | ||
| * Defaults to `null`, disabling resumability. | ||
| * @param retryIntervalMillis Retry interval in milliseconds for event handling or reconnection attempts. | ||
| * Defaults to `null`. | ||
| */ | ||
| @Suppress("MaxLineLength") | ||
| @Deprecated( | ||
| "Use constructor with Configuration: StreamableHttpServerTransport(Configuration(enableJsonResponse = ...))", | ||
| replaceWith = ReplaceWith( | ||
| "StreamableHttpServerTransport(Configuration(enableJsonResponse = enableJsonResponse, enableDnsRebindingProtection = enableDnsRebindingProtection, allowedHosts = allowedHosts, allowedOrigins = allowedOrigins, eventStore = eventStore, retryIntervalMillis = retryIntervalMillis))", | ||
| ), | ||
| ) | ||
| public constructor( | ||
| enableJsonResponse: Boolean = false, | ||
| enableDnsRebindingProtection: Boolean = false, | ||
| allowedHosts: List<String>? = null, | ||
| allowedOrigins: List<String>? = null, | ||
| eventStore: EventStore? = null, | ||
| retryIntervalMillis: Long? = null, | ||
| ) : this( | ||
| Configuration( | ||
| enableJsonResponse = enableJsonResponse, | ||
| enableDnsRebindingProtection = enableDnsRebindingProtection, | ||
| allowedHosts = allowedHosts, | ||
| allowedOrigins = allowedOrigins, | ||
| eventStore = eventStore, | ||
| retryInterval = retryIntervalMillis?.milliseconds, | ||
| ), | ||
| ) | ||
|
|
||
| /** | ||
| * Configuration for managing various aspects of the StreamableHttpServerTransport. | ||
| * | ||
| * @property enableJsonResponse Determines whether the server should return JSON responses. | ||
| * Defaults to `false`. | ||
| * | ||
| * @property enableDnsRebindingProtection Enables DNS rebinding protection. | ||
| * Defaults to `false`. | ||
| * | ||
| * @property allowedHosts A list of hosts allowed for server communication. | ||
| * Defaults to `null`, allowing all hosts. | ||
| * | ||
| * @property allowedOrigins A list of allowed origins for CORS (Cross-Origin Resource Sharing). | ||
| * Defaults to `null`, allowing all origins. | ||
| * | ||
| * @property eventStore The `EventStore` instance for handling resumable events. | ||
| * Defaults to `null`, disabling resumability. | ||
| * | ||
| * @property retryInterval Retry interval for event handling or reconnection attempts. | ||
| * Defaults to `null`. | ||
| */ | ||
| public class Configuration( | ||
| public val enableJsonResponse: Boolean = false, | ||
| public val enableDnsRebindingProtection: Boolean = false, | ||
| public val allowedHosts: List<String>? = null, | ||
| public val allowedOrigins: List<String>? = null, | ||
| public val eventStore: EventStore? = null, | ||
| public val retryInterval: Duration? = null, | ||
| ) | ||
|
|
||
| public var sessionId: String? = null | ||
| private set | ||
|
|
||
|
|
@@ -177,7 +234,7 @@ public class StreamableHttpServerTransport( | |
| ?: error("No connection established for request id $routingRequestId") | ||
| val activeStream = streamsMapping[streamId] | ||
|
|
||
| if (!enableJsonResponse) { | ||
| if (!configuration.enableJsonResponse) { | ||
| activeStream?.let { stream -> | ||
| emitOnStream(streamId, stream.session, message) | ||
| } | ||
|
|
@@ -194,7 +251,7 @@ public class StreamableHttpServerTransport( | |
| streamMutex.withLock { | ||
| if (activeStream == null) error("No connection established for request ID: $routingRequestId") | ||
|
|
||
| if (enableJsonResponse) { | ||
| if (configuration.enableJsonResponse) { | ||
| activeStream.call.response.header(HttpHeaders.ContentType, ContentType.Application.Json.toString()) | ||
| sessionId?.let { activeStream.call.response.header(MCP_SESSION_ID_HEADER, it) } | ||
| val responses = relatedIds.mapNotNull { requestToResponseMapping[it] } | ||
|
|
@@ -261,7 +318,7 @@ public class StreamableHttpServerTransport( | |
| @Suppress("CyclomaticComplexMethod", "LongMethod", "ReturnCount", "TooGenericExceptionCaught") | ||
| public suspend fun handlePostRequest(session: ServerSSESession?, call: ApplicationCall) { | ||
| try { | ||
| if (!enableJsonResponse && session == null) { | ||
| if (!configuration.enableJsonResponse && session == null) { | ||
| error("Server session can't be null for SSE responses") | ||
| } | ||
|
|
||
|
|
@@ -328,7 +385,7 @@ public class StreamableHttpServerTransport( | |
| } | ||
|
|
||
| val streamId = Uuid.random().toString() | ||
| if (!enableJsonResponse) { | ||
| if (!configuration.enableJsonResponse) { | ||
| call.appendSseHeaders() | ||
| flushSse(session) // flush headers immediately | ||
| maybeSendPrimingEvent(streamId, session) | ||
|
|
@@ -353,7 +410,7 @@ public class StreamableHttpServerTransport( | |
|
|
||
| @Suppress("ReturnCount") | ||
| public suspend fun handleGetRequest(session: ServerSSESession?, call: ApplicationCall) { | ||
| if (enableJsonResponse) { | ||
| if (configuration.enableJsonResponse) { | ||
| call.reject( | ||
| HttpStatusCode.MethodNotAllowed, | ||
| RPCError.ErrorCode.CONNECTION_CLOSED, | ||
|
|
@@ -375,7 +432,7 @@ public class StreamableHttpServerTransport( | |
|
|
||
| if (!validateSession(call) || !validateProtocolVersion(call)) return | ||
|
|
||
| eventStore?.let { store -> | ||
| configuration.eventStore?.let { store -> | ||
| call.request.header(MCP_RESUMPTION_TOKEN_HEADER)?.let { lastEventId -> | ||
| replayEvents(store, lastEventId, sseSession) | ||
| return | ||
|
|
@@ -413,7 +470,7 @@ public class StreamableHttpServerTransport( | |
| */ | ||
| @Suppress("ReturnCount", "TooGenericExceptionCaught") | ||
| public suspend fun closeSseStream(requestId: RequestId) { | ||
| if (enableJsonResponse) return | ||
| if (configuration.enableJsonResponse) return | ||
| val streamId = requestToStreamMapping[requestId] ?: return | ||
| val sessionContext = streamsMapping[streamId] ?: return | ||
|
|
||
|
|
@@ -562,9 +619,9 @@ public class StreamableHttpServerTransport( | |
|
|
||
| @Suppress("ReturnCount") | ||
| private fun validateHeaders(call: ApplicationCall): String? { | ||
| if (!enableDnsRebindingProtection) return null | ||
| if (!configuration.enableDnsRebindingProtection) return null | ||
|
|
||
| allowedHosts?.let { hosts -> | ||
| configuration.allowedHosts?.let { hosts -> | ||
| val hostHeader = call.request.headers[HttpHeaders.Host]?.lowercase() | ||
| val allowedHostsLowercase = hosts.map { it.lowercase() } | ||
|
|
||
|
|
@@ -573,7 +630,7 @@ public class StreamableHttpServerTransport( | |
| } | ||
| } | ||
|
|
||
| allowedOrigins?.let { origins -> | ||
| configuration.allowedOrigins?.let { origins -> | ||
| val originHeader = call.request.headers[HttpHeaders.Origin]?.lowercase() | ||
| val allowedOriginsLowercase = origins.map { it.lowercase() } | ||
|
|
||
|
|
@@ -636,7 +693,7 @@ public class StreamableHttpServerTransport( | |
| this?.lowercase()?.contains(mime.toString().lowercase()) == true | ||
|
|
||
| private suspend fun emitOnStream(streamId: String, session: ServerSSESession?, message: JSONRPCMessage) { | ||
| val eventId = eventStore?.storeEvent(streamId, message) | ||
| val eventId = configuration.eventStore?.storeEvent(streamId, message) | ||
| try { | ||
| session?.send(event = "message", id = eventId, data = McpJson.encodeToString(message)) | ||
| } catch (_: Exception) { | ||
|
|
@@ -646,11 +703,15 @@ public class StreamableHttpServerTransport( | |
|
|
||
| @Suppress("TooGenericExceptionCaught") | ||
| private suspend fun maybeSendPrimingEvent(streamId: String, session: ServerSSESession?) { | ||
| val store = eventStore ?: return | ||
| val store = configuration.eventStore ?: return | ||
| val sseSession = session ?: return | ||
| try { | ||
| val primingEventId = store.storeEvent(streamId, JSONRPCEmptyMessage) | ||
| sseSession.send(id = primingEventId, retry = retryIntervalMillis, data = "") | ||
| sseSession.send( | ||
| id = primingEventId, | ||
| retry = configuration.retryInterval?.inWholeMilliseconds, | ||
| data = "", | ||
| ) | ||
| } catch (e: Exception) { | ||
| _onError(e) | ||
| } | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is passing now