feat: enable specifiy database and collection name in local api with neo4j(enterprise)#1245
Open
nuoxiichn wants to merge 2 commits intoMemTensor:mainfrom
Open
feat: enable specifiy database and collection name in local api with neo4j(enterprise)#1245nuoxiichn wants to merge 2 commits intoMemTensor:mainfrom
nuoxiichn wants to merge 2 commits intoMemTensor:mainfrom
Conversation
…neo4j(enterprise)
Contributor
There was a problem hiding this comment.
Pull request overview
This PR adds multi-tenant routing support for Neo4j Enterprise multi-database deployments and optional external vector DB (Qdrant) syncing, allowing per-user/per-cube isolation across both graph and vector storage.
Changes:
- Add per-user/per-cube component initialization + caching for Neo4j multi-db mode in API handlers.
- Add optional external VecDB sync path for Neo4j graph writes and use the external VecDB for embedding search when configured.
- Add Qdrant per-scope collection routing (via
user_id/user_name) and update API Neo4j config generation to support these modes.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
src/memos/vec_dbs/qdrant.py |
Routes operations to user-scoped Qdrant collections; adds collection auto-create/indexing helpers and multi-collection fallback reads/deletes. |
src/memos/graph_dbs/neo4j.py |
Adds optional Qdrant sync on writes and prefers external VecDB for search_by_embedding when configured. |
src/memos/configs/graph_db.py |
Extends Neo4jGraphDBConfig with optional vec_config for external vector syncing/search. |
src/memos/api/handlers/search_handler.py |
Adds per-db component caching/creation for Neo4j multi-db search flows. |
src/memos/api/handlers/component_init.py |
Introduces create_per_db_components to build isolated per-db graph/memory/search components. |
src/memos/api/handlers/add_handler.py |
Adds per-user component caching/creation for Neo4j multi-db add flows. |
src/memos/api/config.py |
Updates Neo4j config generation for multi-db naming and optional Qdrant sync configuration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
Comment on lines
+307
to
+320
| for collection_name in self._all_candidate_collections(): | ||
| if not remaining_ids: | ||
| break | ||
|
|
||
| return [ | ||
| VecDBItem( | ||
| id=point.id, | ||
| vector=point.vector, | ||
| payload=point.payload, | ||
| ) | ||
| for point in response | ||
| ] | ||
| try: | ||
| response = self.client.retrieve( | ||
| collection_name=collection_name, | ||
| ids=list(remaining_ids), | ||
| with_payload=True, | ||
| with_vectors=True, | ||
| ) | ||
| except Exception: | ||
| continue | ||
|
|
Comment on lines
+405
to
+408
| payload = { | ||
| "memory": memory, | ||
| "vector_sync": "success", | ||
| **metadata, |
Comment on lines
705
to
709
| "user": os.getenv("NEO4J_USER", "neo4j"), | ||
| "db_name": f"memos{user_id.replace('-', '')}", | ||
| "db_name": f"{user_id.replace('_', '-')}", | ||
| "password": os.getenv("NEO4J_PASSWORD", "12345678"), | ||
| "auto_create": True, | ||
| "use_multi_db": True, |
Comment on lines
+386
to
+388
| # Shallow-copy the shared mem_reader and point its searcher at the new database | ||
| # so deduplication reads target the correct graph store. | ||
| new_mem_reader = copy.copy(base_components["mem_reader"]) |
Comment on lines
+135
to
+145
| if user_id not in self._per_user_cube_cache: | ||
| with self._cache_lock: | ||
| if user_id not in self._per_user_cube_cache: | ||
| self.logger.info( | ||
| f"[AddHandler] Creating per-user components for user_id={user_id!r}" | ||
| ) | ||
| self._per_user_cube_cache[user_id] = create_per_db_components( | ||
| db_name=user_id, | ||
| base_components=vars(self.deps), | ||
| ) | ||
| return self._per_user_cube_cache[user_id] |
Comment on lines
492
to
+497
| point_ids: list[str | int] = ids | ||
| self.client.delete( | ||
| collection_name=self.config.collection_name, | ||
| points_selector=models.PointIdsList(points=point_ids), | ||
| ) | ||
| for collection_name in self._all_candidate_collections(): | ||
| self.client.delete( | ||
| collection_name=collection_name, | ||
| points_selector=models.PointIdsList(points=point_ids), | ||
| ) |
| vec_filter["user_name"] = user_name | ||
|
|
||
| if search_filter: | ||
| vec_filter.update(search_filter) |
Comment on lines
+61
to
+78
| def _get_per_db_components(self, db_name: str) -> dict[str, Any]: | ||
| """Return cached per-db components, creating them on first access.""" | ||
| if db_name not in self._per_db_cube_cache: | ||
| with self._cache_lock: | ||
| if db_name not in self._per_db_cube_cache: | ||
| self.logger.info( | ||
| f"[SearchHandler] Creating per-db components for db_name={db_name!r}" | ||
| ) | ||
| per_db = create_per_db_components( | ||
| db_name=db_name, | ||
| base_components=vars(self.deps), | ||
| ) | ||
| per_db["deepsearch_agent"] = DeepSearchMemAgent( | ||
| llm=self.llm, | ||
| memory_retriever=per_db["text_mem"], | ||
| ) | ||
| self._per_db_cube_cache[db_name] = per_db | ||
| return self._per_db_cube_cache[db_name] |
Comment on lines
+238
to
+241
| collection_name = self._resolve_collection_name(filter_dict=filter) | ||
| if not self.collection_exists(collection_name): | ||
| logger.info(f"Qdrant collection '{collection_name}' does not exist, returning empty search result.") | ||
| return [] |
Comment on lines
279
to
+290
| def get_by_id(self, id: str) -> VecDBItem | None: | ||
| """Get a single item by ID.""" | ||
| response = self.client.retrieve( | ||
| collection_name=self.config.collection_name, | ||
| ids=[id], | ||
| with_payload=True, | ||
| with_vectors=True, | ||
| ) | ||
|
|
||
| if not response: | ||
| return None | ||
| for collection_name in self._all_candidate_collections(): | ||
| try: | ||
| response = self.client.retrieve( | ||
| collection_name=collection_name, | ||
| ids=[id], | ||
| with_payload=True, | ||
| with_vectors=True, | ||
| ) | ||
| except Exception: | ||
| continue |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
This commit fixes the limitation where the local API could not target a specific database and collection, which is required for Neo4j Enterprise and multi-DB setups. It introduces new configuration fields and threads the db/collection parameters through API handlers and initialization flow. The Neo4j and Qdrant adapters are updated accordingly to create/use the specified database and collection.
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
Run test with
uv run --with pytest pytest -q test_multidb_handlers.py test_qdrant.py. Also, you can change different user_id in current examples and see new database/collection created at qdrant dashboard and neo4j browser.Checklist
Reviewer Checklist