|
43 | 43 | JsonDocStatusStorage, |
44 | 44 | ) |
45 | 45 |
|
| 46 | +from .prompt import GRAPH_FIELD_SEP |
| 47 | + |
46 | 48 | # future KG integrations |
47 | 49 |
|
48 | 50 | # from .kg.ArangoDB_impl import ( |
@@ -672,7 +674,7 @@ async def adelete_by_entity(self, entity_name: str): |
672 | 674 |
|
673 | 675 | try: |
674 | 676 | await self.entities_vdb.delete_entity(entity_name) |
675 | | - await self.relationships_vdb.delete_relation(entity_name) |
| 677 | + await self.relationships_vdb.delete_entity_relation(entity_name) |
676 | 678 | await self.chunk_entity_relation_graph.delete_node(entity_name) |
677 | 679 |
|
678 | 680 | logger.info( |
@@ -716,3 +718,311 @@ async def get_processing_status(self) -> Dict[str, int]: |
716 | 718 | Dict with counts for each status |
717 | 719 | """ |
718 | 720 | return await self.doc_status.get_status_counts() |
| 721 | + |
| 722 | + async def adelete_by_doc_id(self, doc_id: str): |
| 723 | + """Delete a document and all its related data |
| 724 | +
|
| 725 | + Args: |
| 726 | + doc_id: Document ID to delete |
| 727 | + """ |
| 728 | + try: |
| 729 | + # 1. Get the document status and related data |
| 730 | + doc_status = await self.doc_status.get(doc_id) |
| 731 | + if not doc_status: |
| 732 | + logger.warning(f"Document {doc_id} not found") |
| 733 | + return |
| 734 | + |
| 735 | + logger.debug(f"Starting deletion for document {doc_id}") |
| 736 | + |
| 737 | + # 2. Get all related chunks |
| 738 | + chunks = await self.text_chunks.filter( |
| 739 | + lambda x: x.get("full_doc_id") == doc_id |
| 740 | + ) |
| 741 | + chunk_ids = list(chunks.keys()) |
| 742 | + logger.debug(f"Found {len(chunk_ids)} chunks to delete") |
| 743 | + |
| 744 | + # 3. Before deleting, check the related entities and relationships for these chunks |
| 745 | + for chunk_id in chunk_ids: |
| 746 | + # Check entities |
| 747 | + entities = [ |
| 748 | + dp |
| 749 | + for dp in self.entities_vdb.client_storage["data"] |
| 750 | + if dp.get("source_id") == chunk_id |
| 751 | + ] |
| 752 | + logger.debug(f"Chunk {chunk_id} has {len(entities)} related entities") |
| 753 | + |
| 754 | + # Check relationships |
| 755 | + relations = [ |
| 756 | + dp |
| 757 | + for dp in self.relationships_vdb.client_storage["data"] |
| 758 | + if dp.get("source_id") == chunk_id |
| 759 | + ] |
| 760 | + logger.debug(f"Chunk {chunk_id} has {len(relations)} related relations") |
| 761 | + |
| 762 | + # Continue with the original deletion process... |
| 763 | + |
| 764 | + # 4. Delete chunks from vector database |
| 765 | + if chunk_ids: |
| 766 | + await self.chunks_vdb.delete(chunk_ids) |
| 767 | + await self.text_chunks.delete(chunk_ids) |
| 768 | + |
| 769 | + # 5. Find and process entities and relationships that have these chunks as source |
| 770 | + # Get all nodes in the graph |
| 771 | + nodes = self.chunk_entity_relation_graph._graph.nodes(data=True) |
| 772 | + edges = self.chunk_entity_relation_graph._graph.edges(data=True) |
| 773 | + |
| 774 | + # Track which entities and relationships need to be deleted or updated |
| 775 | + entities_to_delete = set() |
| 776 | + entities_to_update = {} # entity_name -> new_source_id |
| 777 | + relationships_to_delete = set() |
| 778 | + relationships_to_update = {} # (src, tgt) -> new_source_id |
| 779 | + |
| 780 | + # Process entities |
| 781 | + for node, data in nodes: |
| 782 | + if "source_id" in data: |
| 783 | + # Split source_id using GRAPH_FIELD_SEP |
| 784 | + sources = set(data["source_id"].split(GRAPH_FIELD_SEP)) |
| 785 | + sources.difference_update(chunk_ids) |
| 786 | + if not sources: |
| 787 | + entities_to_delete.add(node) |
| 788 | + logger.debug( |
| 789 | + f"Entity {node} marked for deletion - no remaining sources" |
| 790 | + ) |
| 791 | + else: |
| 792 | + new_source_id = GRAPH_FIELD_SEP.join(sources) |
| 793 | + entities_to_update[node] = new_source_id |
| 794 | + logger.debug( |
| 795 | + f"Entity {node} will be updated with new source_id: {new_source_id}" |
| 796 | + ) |
| 797 | + |
| 798 | + # Process relationships |
| 799 | + for src, tgt, data in edges: |
| 800 | + if "source_id" in data: |
| 801 | + # Split source_id using GRAPH_FIELD_SEP |
| 802 | + sources = set(data["source_id"].split(GRAPH_FIELD_SEP)) |
| 803 | + sources.difference_update(chunk_ids) |
| 804 | + if not sources: |
| 805 | + relationships_to_delete.add((src, tgt)) |
| 806 | + logger.debug( |
| 807 | + f"Relationship {src}-{tgt} marked for deletion - no remaining sources" |
| 808 | + ) |
| 809 | + else: |
| 810 | + new_source_id = GRAPH_FIELD_SEP.join(sources) |
| 811 | + relationships_to_update[(src, tgt)] = new_source_id |
| 812 | + logger.debug( |
| 813 | + f"Relationship {src}-{tgt} will be updated with new source_id: {new_source_id}" |
| 814 | + ) |
| 815 | + |
| 816 | + # Delete entities |
| 817 | + if entities_to_delete: |
| 818 | + for entity in entities_to_delete: |
| 819 | + await self.entities_vdb.delete_entity(entity) |
| 820 | + logger.debug(f"Deleted entity {entity} from vector DB") |
| 821 | + self.chunk_entity_relation_graph.remove_nodes(list(entities_to_delete)) |
| 822 | + logger.debug(f"Deleted {len(entities_to_delete)} entities from graph") |
| 823 | + |
| 824 | + # Update entities |
| 825 | + for entity, new_source_id in entities_to_update.items(): |
| 826 | + node_data = self.chunk_entity_relation_graph._graph.nodes[entity] |
| 827 | + node_data["source_id"] = new_source_id |
| 828 | + await self.chunk_entity_relation_graph.upsert_node(entity, node_data) |
| 829 | + logger.debug( |
| 830 | + f"Updated entity {entity} with new source_id: {new_source_id}" |
| 831 | + ) |
| 832 | + |
| 833 | + # Delete relationships |
| 834 | + if relationships_to_delete: |
| 835 | + for src, tgt in relationships_to_delete: |
| 836 | + rel_id_0 = compute_mdhash_id(src + tgt, prefix="rel-") |
| 837 | + rel_id_1 = compute_mdhash_id(tgt + src, prefix="rel-") |
| 838 | + await self.relationships_vdb.delete([rel_id_0, rel_id_1]) |
| 839 | + logger.debug(f"Deleted relationship {src}-{tgt} from vector DB") |
| 840 | + self.chunk_entity_relation_graph.remove_edges( |
| 841 | + list(relationships_to_delete) |
| 842 | + ) |
| 843 | + logger.debug( |
| 844 | + f"Deleted {len(relationships_to_delete)} relationships from graph" |
| 845 | + ) |
| 846 | + |
| 847 | + # Update relationships |
| 848 | + for (src, tgt), new_source_id in relationships_to_update.items(): |
| 849 | + edge_data = self.chunk_entity_relation_graph._graph.edges[src, tgt] |
| 850 | + edge_data["source_id"] = new_source_id |
| 851 | + await self.chunk_entity_relation_graph.upsert_edge(src, tgt, edge_data) |
| 852 | + logger.debug( |
| 853 | + f"Updated relationship {src}-{tgt} with new source_id: {new_source_id}" |
| 854 | + ) |
| 855 | + |
| 856 | + # 6. Delete original document and status |
| 857 | + await self.full_docs.delete([doc_id]) |
| 858 | + await self.doc_status.delete([doc_id]) |
| 859 | + |
| 860 | + # 7. Ensure all indexes are updated |
| 861 | + await self._insert_done() |
| 862 | + |
| 863 | + logger.info( |
| 864 | + f"Successfully deleted document {doc_id} and related data. " |
| 865 | + f"Deleted {len(entities_to_delete)} entities and {len(relationships_to_delete)} relationships. " |
| 866 | + f"Updated {len(entities_to_update)} entities and {len(relationships_to_update)} relationships." |
| 867 | + ) |
| 868 | + |
| 869 | + # Add verification step |
| 870 | + async def verify_deletion(): |
| 871 | + # Verify if the document has been deleted |
| 872 | + if await self.full_docs.get_by_id(doc_id): |
| 873 | + logger.error(f"Document {doc_id} still exists in full_docs") |
| 874 | + |
| 875 | + # Verify if chunks have been deleted |
| 876 | + remaining_chunks = await self.text_chunks.filter( |
| 877 | + lambda x: x.get("full_doc_id") == doc_id |
| 878 | + ) |
| 879 | + if remaining_chunks: |
| 880 | + logger.error(f"Found {len(remaining_chunks)} remaining chunks") |
| 881 | + |
| 882 | + # Verify entities and relationships |
| 883 | + for chunk_id in chunk_ids: |
| 884 | + # Check entities |
| 885 | + entities_with_chunk = [ |
| 886 | + dp |
| 887 | + for dp in self.entities_vdb.client_storage["data"] |
| 888 | + if chunk_id |
| 889 | + in (dp.get("source_id") or "").split(GRAPH_FIELD_SEP) |
| 890 | + ] |
| 891 | + if entities_with_chunk: |
| 892 | + logger.error( |
| 893 | + f"Found {len(entities_with_chunk)} entities still referencing chunk {chunk_id}" |
| 894 | + ) |
| 895 | + |
| 896 | + # Check relationships |
| 897 | + relations_with_chunk = [ |
| 898 | + dp |
| 899 | + for dp in self.relationships_vdb.client_storage["data"] |
| 900 | + if chunk_id |
| 901 | + in (dp.get("source_id") or "").split(GRAPH_FIELD_SEP) |
| 902 | + ] |
| 903 | + if relations_with_chunk: |
| 904 | + logger.error( |
| 905 | + f"Found {len(relations_with_chunk)} relations still referencing chunk {chunk_id}" |
| 906 | + ) |
| 907 | + |
| 908 | + await verify_deletion() |
| 909 | + |
| 910 | + except Exception as e: |
| 911 | + logger.error(f"Error while deleting document {doc_id}: {e}") |
| 912 | + |
| 913 | + def delete_by_doc_id(self, doc_id: str): |
| 914 | + """Synchronous version of adelete""" |
| 915 | + return asyncio.run(self.adelete_by_doc_id(doc_id)) |
| 916 | + |
| 917 | + async def get_entity_info( |
| 918 | + self, entity_name: str, include_vector_data: bool = False |
| 919 | + ): |
| 920 | + """Get detailed information of an entity |
| 921 | +
|
| 922 | + Args: |
| 923 | + entity_name: Entity name (no need for quotes) |
| 924 | + include_vector_data: Whether to include data from the vector database |
| 925 | +
|
| 926 | + Returns: |
| 927 | + dict: A dictionary containing entity information, including: |
| 928 | + - entity_name: Entity name |
| 929 | + - source_id: Source document ID |
| 930 | + - graph_data: Complete node data from the graph database |
| 931 | + - vector_data: (optional) Data from the vector database |
| 932 | + """ |
| 933 | + entity_name = f'"{entity_name.upper()}"' |
| 934 | + |
| 935 | + # Get information from the graph |
| 936 | + node_data = await self.chunk_entity_relation_graph.get_node(entity_name) |
| 937 | + source_id = node_data.get("source_id") if node_data else None |
| 938 | + |
| 939 | + result = { |
| 940 | + "entity_name": entity_name, |
| 941 | + "source_id": source_id, |
| 942 | + "graph_data": node_data, |
| 943 | + } |
| 944 | + |
| 945 | + # Optional: Get vector database information |
| 946 | + if include_vector_data: |
| 947 | + entity_id = compute_mdhash_id(entity_name, prefix="ent-") |
| 948 | + vector_data = self.entities_vdb._client.get([entity_id]) |
| 949 | + result["vector_data"] = vector_data[0] if vector_data else None |
| 950 | + |
| 951 | + return result |
| 952 | + |
| 953 | + def get_entity_info_sync(self, entity_name: str, include_vector_data: bool = False): |
| 954 | + """Synchronous version of getting entity information |
| 955 | +
|
| 956 | + Args: |
| 957 | + entity_name: Entity name (no need for quotes) |
| 958 | + include_vector_data: Whether to include data from the vector database |
| 959 | + """ |
| 960 | + try: |
| 961 | + import tracemalloc |
| 962 | + |
| 963 | + tracemalloc.start() |
| 964 | + return asyncio.run(self.get_entity_info(entity_name, include_vector_data)) |
| 965 | + finally: |
| 966 | + tracemalloc.stop() |
| 967 | + |
| 968 | + async def get_relation_info( |
| 969 | + self, src_entity: str, tgt_entity: str, include_vector_data: bool = False |
| 970 | + ): |
| 971 | + """Get detailed information of a relationship |
| 972 | +
|
| 973 | + Args: |
| 974 | + src_entity: Source entity name (no need for quotes) |
| 975 | + tgt_entity: Target entity name (no need for quotes) |
| 976 | + include_vector_data: Whether to include data from the vector database |
| 977 | +
|
| 978 | + Returns: |
| 979 | + dict: A dictionary containing relationship information, including: |
| 980 | + - src_entity: Source entity name |
| 981 | + - tgt_entity: Target entity name |
| 982 | + - source_id: Source document ID |
| 983 | + - graph_data: Complete edge data from the graph database |
| 984 | + - vector_data: (optional) Data from the vector database |
| 985 | + """ |
| 986 | + src_entity = f'"{src_entity.upper()}"' |
| 987 | + tgt_entity = f'"{tgt_entity.upper()}"' |
| 988 | + |
| 989 | + # Get information from the graph |
| 990 | + edge_data = await self.chunk_entity_relation_graph.get_edge( |
| 991 | + src_entity, tgt_entity |
| 992 | + ) |
| 993 | + source_id = edge_data.get("source_id") if edge_data else None |
| 994 | + |
| 995 | + result = { |
| 996 | + "src_entity": src_entity, |
| 997 | + "tgt_entity": tgt_entity, |
| 998 | + "source_id": source_id, |
| 999 | + "graph_data": edge_data, |
| 1000 | + } |
| 1001 | + |
| 1002 | + # Optional: Get vector database information |
| 1003 | + if include_vector_data: |
| 1004 | + rel_id = compute_mdhash_id(src_entity + tgt_entity, prefix="rel-") |
| 1005 | + vector_data = self.relationships_vdb._client.get([rel_id]) |
| 1006 | + result["vector_data"] = vector_data[0] if vector_data else None |
| 1007 | + |
| 1008 | + return result |
| 1009 | + |
| 1010 | + def get_relation_info_sync( |
| 1011 | + self, src_entity: str, tgt_entity: str, include_vector_data: bool = False |
| 1012 | + ): |
| 1013 | + """Synchronous version of getting relationship information |
| 1014 | +
|
| 1015 | + Args: |
| 1016 | + src_entity: Source entity name (no need for quotes) |
| 1017 | + tgt_entity: Target entity name (no need for quotes) |
| 1018 | + include_vector_data: Whether to include data from the vector database |
| 1019 | + """ |
| 1020 | + try: |
| 1021 | + import tracemalloc |
| 1022 | + |
| 1023 | + tracemalloc.start() |
| 1024 | + return asyncio.run( |
| 1025 | + self.get_relation_info(src_entity, tgt_entity, include_vector_data) |
| 1026 | + ) |
| 1027 | + finally: |
| 1028 | + tracemalloc.stop() |
0 commit comments