Skip to content

Commit c5b60f2

Browse files
Integrate Nexus gateway chat/streaming endpoints (#691)
* feat: invoke nexus gateway chat and stream endpoints * fix: use nexus as provider name * fix: llm streaming response and token building * chore: upgrade langchain --------- Co-authored-by: Maryam Khidir <[email protected]>
1 parent 5dfcb71 commit c5b60f2

File tree

21 files changed

+1278
-68
lines changed

21 files changed

+1278
-68
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
import json
2+
import uuid
3+
import pytest
4+
import os
5+
import time
6+
7+
8+
@pytest.mark.skipif(
9+
not all(
10+
[
11+
os.getenv("NEXUS_GATEWAY_URL"),
12+
os.getenv("NEXUS_AUTH_CLIENT_ID"),
13+
os.getenv("NEXUS_AUTH_CLIENT_SECRET"),
14+
os.getenv("NEXUS_AUTH_TOKEN_URL"),
15+
]
16+
),
17+
reason="Nexus Gateway credentials not configured",
18+
)
19+
def test_nexus_chat_request(client, nexus_model):
20+
"""Test chat request using Nexus model through chatbot API"""
21+
session_id = str(uuid.uuid4())
22+
23+
request = {
24+
"action": "run",
25+
"modelInterface": "langchain",
26+
"data": {
27+
"mode": "chain",
28+
"text": "Hello, please respond with just 'Hi there!'",
29+
"files": [],
30+
"modelName": nexus_model,
31+
"provider": "nexus",
32+
"sessionId": session_id,
33+
},
34+
}
35+
36+
client.send_query(json.dumps(request))
37+
38+
# Wait for session to be created and validate through session history
39+
retries = 0
40+
session = None
41+
while retries < 30:
42+
time.sleep(1)
43+
retries += 1
44+
session = client.get_session(session_id)
45+
if session and len(session.get("history", [])) == 2:
46+
break
47+
48+
assert session is not None
49+
assert session.get("id") == session_id
50+
assert len(session.get("history")) == 2
51+
assert session.get("history")[0].get("type") == "human"
52+
assert session.get("history")[1].get("type") == "ai"
53+
assert len(session.get("history")[1].get("content", "")) > 0
54+
55+
print(f"Nexus chat response: {session['history'][1]['content']}")
56+
57+
# Cleanup
58+
client.delete_session(session_id)
59+
60+
61+
@pytest.mark.skipif(
62+
not all(
63+
[
64+
os.getenv("NEXUS_GATEWAY_URL"),
65+
os.getenv("NEXUS_AUTH_CLIENT_ID"),
66+
os.getenv("NEXUS_AUTH_CLIENT_SECRET"),
67+
os.getenv("NEXUS_AUTH_TOKEN_URL"),
68+
]
69+
),
70+
reason="Nexus Gateway credentials not configured",
71+
)
72+
def test_nexus_streaming_request(client, nexus_model):
73+
"""Test streaming chat request using Nexus model"""
74+
session_id = str(uuid.uuid4())
75+
76+
request = {
77+
"action": "run",
78+
"modelInterface": "langchain",
79+
"data": {
80+
"mode": "chain",
81+
"text": "Count from 1 to 5",
82+
"files": [],
83+
"modelName": nexus_model,
84+
"provider": "nexus",
85+
"sessionId": session_id,
86+
"streaming": True,
87+
},
88+
}
89+
90+
client.send_query(json.dumps(request))
91+
92+
# Wait for session to be created and validate through session history
93+
retries = 0
94+
session = None
95+
while retries < 30:
96+
time.sleep(1)
97+
retries += 1
98+
session = client.get_session(session_id)
99+
if session and len(session.get("history", [])) == 2:
100+
break
101+
102+
assert session is not None
103+
assert session.get("id") == session_id
104+
assert len(session.get("history")) == 2
105+
assert session.get("history")[0].get("type") == "human"
106+
assert session.get("history")[1].get("type") == "ai"
107+
assert len(session.get("history")[1].get("content", "")) > 0
108+
109+
print(f"Nexus streaming response: {session['history'][1]['content']}")
110+
111+
# Cleanup
112+
client.delete_session(session_id)
113+
114+
115+
@pytest.mark.skipif(
116+
not all(
117+
[
118+
os.getenv("NEXUS_GATEWAY_URL"),
119+
os.getenv("NEXUS_AUTH_CLIENT_ID"),
120+
os.getenv("NEXUS_AUTH_CLIENT_SECRET"),
121+
os.getenv("NEXUS_AUTH_TOKEN_URL"),
122+
]
123+
),
124+
reason="Nexus Gateway credentials not configured",
125+
)
126+
def test_nexus_conversation_history(client, nexus_model):
127+
"""Test conversation history with Nexus model"""
128+
session_id = str(uuid.uuid4())
129+
130+
# First message
131+
request1 = {
132+
"action": "run",
133+
"modelInterface": "langchain",
134+
"data": {
135+
"mode": "chain",
136+
"text": "My name is Alice",
137+
"files": [],
138+
"modelName": nexus_model,
139+
"provider": "nexus",
140+
"sessionId": session_id,
141+
},
142+
}
143+
144+
client.send_query(json.dumps(request1))
145+
146+
# Wait for first response
147+
retries = 0
148+
while retries < 30:
149+
time.sleep(1)
150+
retries += 1
151+
session = client.get_session(session_id)
152+
if session and len(session.get("history", [])) == 2:
153+
break
154+
155+
# Second message referencing first
156+
request2 = {
157+
"action": "run",
158+
"modelInterface": "langchain",
159+
"data": {
160+
"mode": "chain",
161+
"text": "What is my name?",
162+
"files": [],
163+
"modelName": nexus_model,
164+
"provider": "nexus",
165+
"sessionId": session_id,
166+
},
167+
}
168+
169+
client.send_query(json.dumps(request2))
170+
171+
# Wait for second response
172+
retries = 0
173+
final_session = None
174+
while retries < 30:
175+
time.sleep(1)
176+
retries += 1
177+
final_session = client.get_session(session_id)
178+
if final_session and len(final_session.get("history", [])) == 4:
179+
break
180+
181+
assert final_session is not None
182+
assert len(final_session.get("history")) == 4
183+
assert final_session.get("history")[2].get("type") == "human"
184+
assert final_session.get("history")[3].get("type") == "ai"
185+
186+
second_response = final_session["history"][3]["content"]
187+
assert "Alice" in second_response or "alice" in second_response.lower()
188+
189+
print(f"First response: {final_session['history'][1]['content']}")
190+
print(f"Second response: {second_response}")
191+
192+
# Cleanup
193+
client.delete_session(session_id)
194+
195+
196+
@pytest.fixture(scope="session")
197+
def nexus_model():
198+
"""Get Nexus model name from environment or use default"""
199+
return os.getenv("NEXUS_TEST_MODEL_NAME", "gpt-3.5-turbo")
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
"""Integration tests for Nexus Gateway Client"""
2+
3+
import os
4+
import pytest
5+
import sys
6+
7+
from genai_core.model_providers.nexus.nexus_client import NexusGatewayClient
8+
from genai_core.model_providers.nexus.types import NexusGatewayConfig
9+
10+
# Add the python-sdk to the path
11+
sys.path.insert(
12+
0, os.path.join(os.path.dirname(__file__), "../lib/shared/layers/python-sdk/python")
13+
)
14+
15+
16+
class TestNexusGatewayClientIntegration:
17+
"""Real integration tests for NexusGatewayClient"""
18+
19+
@pytest.fixture
20+
def nexus_config(self):
21+
"""Create configuration from environment variables"""
22+
return NexusGatewayConfig(
23+
gateway_url=os.getenv("NEXUS_GATEWAY_URL"),
24+
client_id=os.getenv("NEXUS_AUTH_CLIENT_ID"),
25+
client_secret=os.getenv("NEXUS_AUTH_CLIENT_SECRET"),
26+
token_url=os.getenv("NEXUS_AUTH_TOKEN_URL"),
27+
enabled=True,
28+
)
29+
30+
@pytest.fixture
31+
def nexus_client(self, nexus_config):
32+
"""Create NexusGatewayClient instance"""
33+
return NexusGatewayClient(nexus_config)
34+
35+
@pytest.mark.skipif(
36+
not all(
37+
[
38+
os.getenv("NEXUS_GATEWAY_URL"),
39+
os.getenv("NEXUS_AUTH_CLIENT_ID"),
40+
os.getenv("NEXUS_AUTH_CLIENT_SECRET"),
41+
os.getenv("NEXUS_AUTH_TOKEN_URL"),
42+
]
43+
),
44+
reason="Real Nexus credentials not provided",
45+
)
46+
def test_real_token_request(self, nexus_client):
47+
"""Test real token request to Nexus Gateway"""
48+
token = nexus_client.get_access_token()
49+
assert token is not None
50+
assert isinstance(token, str)
51+
assert len(token) > 0
52+
print(f"Successfully obtained token: {token[:20]}...")
53+
54+
@pytest.mark.skipif(
55+
not all(
56+
[
57+
os.getenv("NEXUS_GATEWAY_URL"),
58+
os.getenv("NEXUS_AUTH_CLIENT_ID"),
59+
os.getenv("NEXUS_AUTH_CLIENT_SECRET"),
60+
os.getenv("NEXUS_AUTH_TOKEN_URL"),
61+
]
62+
),
63+
reason="Real Nexus credentials not provided",
64+
)
65+
def test_real_list_application_models(self, nexus_client):
66+
"""Test real list_application_models request"""
67+
models = nexus_client.list_application_models()
68+
assert isinstance(models, list)
69+
print(f"Found {len(models)} models")
70+
if models:
71+
print(f"First model: {models[0]}")
72+
73+
@pytest.mark.skipif(
74+
not all(
75+
[
76+
os.getenv("NEXUS_GATEWAY_URL"),
77+
os.getenv("NEXUS_AUTH_CLIENT_ID"),
78+
os.getenv("NEXUS_AUTH_CLIENT_SECRET"),
79+
os.getenv("NEXUS_AUTH_TOKEN_URL"),
80+
os.getenv("NEXUS_TEST_MODEL_ID"),
81+
]
82+
),
83+
reason="Real Nexus credentials and test model not provided",
84+
)
85+
def test_real_bedrock_converse(self, nexus_client):
86+
"""Test real bedrock converse request"""
87+
model_id = os.getenv("NEXUS_TEST_MODEL_ID")
88+
body = {
89+
"messages": [
90+
{"role": "user", "content": [{"text": "Hello, how are you?"}]}
91+
],
92+
"inferenceConfig": {"maxTokens": 100, "temperature": 0.7},
93+
}
94+
95+
response = nexus_client.invoke_bedrock_converse(model_id, body)
96+
print(f"Response: {response}")
97+
98+
assert response is not None
99+
assert "output" in response
100+
101+
@pytest.mark.skipif(
102+
not all(
103+
[
104+
os.getenv("NEXUS_GATEWAY_URL"),
105+
os.getenv("NEXUS_AUTH_CLIENT_ID"),
106+
os.getenv("NEXUS_AUTH_CLIENT_SECRET"),
107+
os.getenv("NEXUS_AUTH_TOKEN_URL"),
108+
os.getenv("NEXUS_TEST_MODEL_ID"),
109+
]
110+
),
111+
reason="Real Nexus credentials and test model not provided",
112+
)
113+
def test_real_bedrock_converse_stream(self, nexus_client):
114+
"""Test real bedrock converse stream request"""
115+
model_id = os.getenv("NEXUS_TEST_MODEL_ID")
116+
body = {
117+
"messages": [{"role": "user", "content": [{"text": "Count from 1 to 3"}]}],
118+
"inferenceConfig": {"maxTokens": 100, "temperature": 0.7},
119+
}
120+
121+
response = nexus_client.invoke_bedrock_converse_stream(model_id, body)
122+
print(f"Streaming response: {response}")
123+
124+
assert response is not None
125+
full_response = ""
126+
127+
# Handle different streaming response formats
128+
if "stream" in response:
129+
stream_resp = response["stream"]
130+
try:
131+
import json
132+
133+
# Read the raw content and extract text from contentBlockDelta events
134+
content = stream_resp.content
135+
if content:
136+
content_str = content.decode("utf-8", errors="ignore")
137+
138+
# Split by contentBlockDelta to find JSON chunks
139+
parts = content_str.split("contentBlockDelta")
140+
for part in parts[1:]: # Skip first empty part
141+
# Find JSON object in this part
142+
json_start = part.find('{"contentBlockIndex"')
143+
if json_start >= 0:
144+
# Find the end of the JSON object
145+
brace_count = 0
146+
json_end = json_start
147+
for i, char in enumerate(part[json_start:], json_start):
148+
if char == "{":
149+
brace_count += 1
150+
elif char == "}":
151+
brace_count -= 1
152+
if brace_count == 0:
153+
json_end = i + 1
154+
break
155+
156+
try:
157+
json_str = part[json_start:json_end]
158+
data = json.loads(json_str)
159+
if "delta" in data and "text" in data["delta"]:
160+
full_response += data["delta"]["text"]
161+
except json.JSONDecodeError:
162+
continue
163+
164+
print(f"Parsed streaming response: {full_response}")
165+
else:
166+
print("No content in streaming response")
167+
except Exception as e:
168+
print(f"Error processing streaming response: {e}")
169+
# Fallback: return raw content
170+
full_response = str(response)
171+
else:
172+
# Handle regular response
173+
assert isinstance(response, dict)
174+
print(f"Response: {response}")

lib/model-interfaces/langchain/functions/request-handler/adapters/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@
66
from .bedrock_agent import *
77
from .base import Mode
88
from .shared import *
9+
from .nexus import *
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# flake8: noqa
2+
from adapters.nexus.base import *
3+
from adapters.nexus.stream import NexusChatStreamAdapter
4+
from genai_core.registry import registry
5+
6+
# Register nexus adapters
7+
registry.register(r"^nexus.*", NexusChatStreamAdapter)

0 commit comments

Comments
 (0)