Skip to content

feat: enable specifiy database and collection name in local api with neo4j(enterprise)#1245

Open
nuoxiichn wants to merge 2 commits intoMemTensor:mainfrom
nuoxiichn:feat/multi_db
Open

feat: enable specifiy database and collection name in local api with neo4j(enterprise)#1245
nuoxiichn wants to merge 2 commits intoMemTensor:mainfrom
nuoxiichn:feat/multi_db

Conversation

@nuoxiichn
Copy link

@nuoxiichn nuoxiichn commented Mar 16, 2026

Description

This commit fixes the limitation where the local API could not target a specific database and collection, which is required for Neo4j Enterprise and multi-DB setups. It introduces new configuration fields and threads the db/collection parameters through API handlers and initialization flow. The Neo4j and Qdrant adapters are updated accordingly to create/use the specified database and collection.

Type of change

Please delete options that are not relevant.

  • New feature (non-breaking change which adds functionality)

How Has This Been Tested?

Run test with uv run --with pytest pytest -q test_multidb_handlers.py test_qdrant.py. Also, you can change different user_id in current examples and see new database/collection created at qdrant dashboard and neo4j browser.

# .env
NEO4J_BACKEND=neo4j

# must with QDRANT config, host/post, or url
  • Unit Test

Checklist

  • I have performed a self-review of my own code | 我已自行检查了自己的代码
  • I have commented my code in hard-to-understand areas | 我已在难以理解的地方对代码进行了注释
  • I have added tests that prove my fix is effective or that my feature works | 我已添加测试以证明我的修复有效或功能正常
  • I have created related documentation issue/PR in MemOS-Docs (if applicable) | 我已在 MemOS-Docs 中创建了相关的文档 issue/PR(如果适用)
  • I have linked the issue to this PR (if applicable) | 我已将 issue 链接到此 PR(如果适用)
  • I have mentioned the person who will review this PR | 我已提及将审查此 PR 的人

Reviewer Checklist

  • closes #xxxx (Replace xxxx with the GitHub issue number)
  • Made sure Checks passed
  • Tests have been provided

Copilot AI review requested due to automatic review settings March 16, 2026 10:17
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds multi-tenant routing support for Neo4j Enterprise multi-database deployments and optional external vector DB (Qdrant) syncing, allowing per-user/per-cube isolation across both graph and vector storage.

Changes:

  • Add per-user/per-cube component initialization + caching for Neo4j multi-db mode in API handlers.
  • Add optional external VecDB sync path for Neo4j graph writes and use the external VecDB for embedding search when configured.
  • Add Qdrant per-scope collection routing (via user_id / user_name) and update API Neo4j config generation to support these modes.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
src/memos/vec_dbs/qdrant.py Routes operations to user-scoped Qdrant collections; adds collection auto-create/indexing helpers and multi-collection fallback reads/deletes.
src/memos/graph_dbs/neo4j.py Adds optional Qdrant sync on writes and prefers external VecDB for search_by_embedding when configured.
src/memos/configs/graph_db.py Extends Neo4jGraphDBConfig with optional vec_config for external vector syncing/search.
src/memos/api/handlers/search_handler.py Adds per-db component caching/creation for Neo4j multi-db search flows.
src/memos/api/handlers/component_init.py Introduces create_per_db_components to build isolated per-db graph/memory/search components.
src/memos/api/handlers/add_handler.py Adds per-user component caching/creation for Neo4j multi-db add flows.
src/memos/api/config.py Updates Neo4j config generation for multi-db naming and optional Qdrant sync configuration.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +307 to +320
for collection_name in self._all_candidate_collections():
if not remaining_ids:
break

return [
VecDBItem(
id=point.id,
vector=point.vector,
payload=point.payload,
)
for point in response
]
try:
response = self.client.retrieve(
collection_name=collection_name,
ids=list(remaining_ids),
with_payload=True,
with_vectors=True,
)
except Exception:
continue

Comment on lines +405 to +408
payload = {
"memory": memory,
"vector_sync": "success",
**metadata,
Comment on lines 705 to 709
"user": os.getenv("NEO4J_USER", "neo4j"),
"db_name": f"memos{user_id.replace('-', '')}",
"db_name": f"{user_id.replace('_', '-')}",
"password": os.getenv("NEO4J_PASSWORD", "12345678"),
"auto_create": True,
"use_multi_db": True,
Comment on lines +386 to +388
# Shallow-copy the shared mem_reader and point its searcher at the new database
# so deduplication reads target the correct graph store.
new_mem_reader = copy.copy(base_components["mem_reader"])
Comment on lines +135 to +145
if user_id not in self._per_user_cube_cache:
with self._cache_lock:
if user_id not in self._per_user_cube_cache:
self.logger.info(
f"[AddHandler] Creating per-user components for user_id={user_id!r}"
)
self._per_user_cube_cache[user_id] = create_per_db_components(
db_name=user_id,
base_components=vars(self.deps),
)
return self._per_user_cube_cache[user_id]
Comment on lines 492 to +497
point_ids: list[str | int] = ids
self.client.delete(
collection_name=self.config.collection_name,
points_selector=models.PointIdsList(points=point_ids),
)
for collection_name in self._all_candidate_collections():
self.client.delete(
collection_name=collection_name,
points_selector=models.PointIdsList(points=point_ids),
)
vec_filter["user_name"] = user_name

if search_filter:
vec_filter.update(search_filter)
Comment on lines +61 to +78
def _get_per_db_components(self, db_name: str) -> dict[str, Any]:
"""Return cached per-db components, creating them on first access."""
if db_name not in self._per_db_cube_cache:
with self._cache_lock:
if db_name not in self._per_db_cube_cache:
self.logger.info(
f"[SearchHandler] Creating per-db components for db_name={db_name!r}"
)
per_db = create_per_db_components(
db_name=db_name,
base_components=vars(self.deps),
)
per_db["deepsearch_agent"] = DeepSearchMemAgent(
llm=self.llm,
memory_retriever=per_db["text_mem"],
)
self._per_db_cube_cache[db_name] = per_db
return self._per_db_cube_cache[db_name]
Comment on lines +238 to +241
collection_name = self._resolve_collection_name(filter_dict=filter)
if not self.collection_exists(collection_name):
logger.info(f"Qdrant collection '{collection_name}' does not exist, returning empty search result.")
return []
Comment on lines 279 to +290
def get_by_id(self, id: str) -> VecDBItem | None:
"""Get a single item by ID."""
response = self.client.retrieve(
collection_name=self.config.collection_name,
ids=[id],
with_payload=True,
with_vectors=True,
)

if not response:
return None
for collection_name in self._all_candidate_collections():
try:
response = self.client.retrieve(
collection_name=collection_name,
ids=[id],
with_payload=True,
with_vectors=True,
)
except Exception:
continue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants