Graphiti Implementation Guide for OSINT¶
Purpose: Step-by-step guide to implementing custom OSINT entities and relationships in Graphiti knowledge graph
Table of Contents¶
- Prerequisites
- Setup
- Defining Custom Entities
- Defining Custom Relationships
- Episode Processing
- Knowledge Graph Queries
- Integration with OSINT Workflows
Prerequisites¶
Required Systems¶
# Graphiti with Neo4j or FalkorDB
pip install graphiti-core
# Neo4j (recommended) or FalkorDB
# Neo4j: 5.x+
# FalkorDB: Latest
Configuration¶
# Set environment variables
export NEO4J_URI="bolt://localhost:7687"
export NEO4J_USER="neo4j"
export NEO4J_PASSWORD="your_password"
export GRAPHITI_WORKSPACE="/path/to/workspace"
Setup¶
1. Project Structure¶
madeinoz-knowledge-system/
├── graphiti_osint/
│ ├── __init__.py
│ ├── entities/
│ │ ├── __init__.py
│ │ ├── person.py
│ │ ├── organization.py
│ │ ├── account.py
│ │ ├── domain.py
│ │ ├── email.py
│ │ ├── phone.py
│ │ ├── location.py
│ │ └── image.py
│ ├── relationships/
│ │ ├── __init__.py
│ │ ├── identity_resolution.py
│ │ ├── professional.py
│ │ └── corporate.py
│ ├── processors/
│ │ ├── __init__.py
│ │ ├── osint_base.py
│ │ ├── username_recon.py
│ │ ├── email_recon.py
│ │ └── investigation.py
│ └── config.py
└── tests/
├── test_entities.py
└── test_processors.py
2. Initialize Graphiti¶
# graphiti_osint/config.py
from graphiti_core import Graphiti
from graphiti_core.llm_loader import OpenAILLMLoader
from neo4j import GraphDatabase
class OSINTGraphConfig:
"""Configuration for OSINT knowledge graph"""
# Neo4j connection
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "your_password"
# Graphiti workspace
WORKSPACE_DIR = "./workspace"
# LLM for entity extraction
LLM_CLIENT = OpenAILLMLoader()
# Knowledge graph groups
GROUPS = {
"osint-people": "Person entities",
"osint-organizations": "Organization entities",
"osint-accounts": "Platform accounts",
"osint-domains": "Domain/DNS entities",
"osint-emails": "Email entities",
"osint-phones": "Phone entities",
"osint-locations": "Location entities",
"osint-images": "Image entities",
"osint-investigations": "Investigation cases",
}
def get_graphiti() -> Graphiti:
"""Get configured Graphiti instance"""
db = GraphDatabase(
OSINTGraphConfig.NEO4J_URI,
auth=(OSINTGraphConfig.NEO4J_USER, OSINTGraphConfig.NEO4J_PASSWORD)
)
return Graphiti(
database=db,
llm=OSINTGraphConfig.LLM_CLIENT,
workspace_dir=OSINTGraphConfig.WORKSPACE_DIR
)
Defining Custom Entities¶
Person Entity Implementation¶
# graphiti_osint/entities/person.py
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
from datetime import datetime
from enum import Enum
class AccountStatus(str, Enum):
ACTIVE = "Active"
INACTIVE = "Inactive"
SUSPENDED = "Suspended"
DELETED = "Deleted"
PRIVATE = "Private"
class PersonEntity(BaseModel):
"""Person entity for OSINT investigations"""
# Primary Key
uuid: str = Field(..., description="Unique person identifier")
# Core Identity
primary_name: str = Field(..., description="Full legal name")
given_name: str = Field(..., description="First name")
family_name: str = Field(..., description="Last name")
middle_name: Optional[str] = Field(None, description="Middle name")
# Identity Variants
aliases: List[str] = Field(default_factory=list, description="AKA, maiden names")
usernames: Dict[str, str] = Field(
default_factory=dict,
description="Platform-specific usernames"
)
emails: List[str] = Field(default_factory=list, description="Associated emails")
phones: List[str] = Field(default_factory=list, description="Associated phones")
# Temporal
born_at: Optional[datetime] = Field(None, description="Date of birth")
active_since: Optional[datetime] = Field(None, description="First online appearance")
last_seen: Optional[datetime] = Field(None, description="Most recent activity")
# Location
locations: List[str] = Field(default_factory=list, description="Known locations")
# Professional
occupations: List[str] = Field(default_factory=list, description="Job titles")
employers: List[str] = Field(default_factory=list, description="Current/past employers")
# Investigation Metadata
confidence: float = Field(default=0.0, ge=0.0, le=100.0, description="Confidence score 0-100")
sources: List[str] = Field(default_factory=list, description="Data source URLs/references")
# Temporal Metadata
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
valid_from: datetime = Field(default_factory=datetime.now)
valid_until: Optional[datetime] = Field(None, description="When this info stops being true")
def to_graphiti_episode(self) -> str:
"""Convert to Graphiti episode format"""
episode_parts = [
f"Person: {self.primary_name}",
f"Born: {self.born_at.strftime('%Y-%m-%d') if self.born_at else 'Unknown'}",
]
if self.aliases:
episode_parts.append(f"AKA: {', '.join(self.aliases)}")
if self.usernames:
usernames_str = ', '.join([
f"{platform}: @{username}"
for platform, username in self.usernames.items()
])
episode_parts.append(f"Usernames: {usernames_str}")
if self.emails:
episode_parts.append(f"Emails: {', '.join(self.emails)}")
if self.employers:
episode_parts.append(f"Employers: {', '.join(self.employers)}")
episode_parts.append(f"Confidence: {self.confidence}%")
episode_parts.append(f"Sources: {', '.join(self.sources)}")
return '\n'.join(episode_parts)
class Config:
json_schema_extra = {
"example": {
"uuid": "person_abc123",
"primary_name": "John Smith",
"given_name": "John",
"family_name": "Smith",
"usernames": {
"twitter": "@jsmith",
"github": "jsmith-dev"
},
"emails": ["john@example.com"],
"confidence": 85.0
}
}
Organization Entity¶
# graphiti_osint/entities/organization.py
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
from enum import Enum
class EntityStatus(str, Enum):
ACTIVE = "Active"
INACTIVE = "Inactive"
DISSOLVED = "Dissolved"
class OrganizationEntity(BaseModel):
"""Organization entity for OSINT investigations"""
uuid: str
entity_type: str = "Organization"
# Legal Identity
legal_name: str
dba_names: List[str] = Field(default_factory=list)
registration_number: str
entity_type: str # Corp, LLC, Ltd, NonProfit
jurisdiction: str
status: EntityStatus
# Temporal
founded_on: datetime
dissolved_on: Optional[datetime] = None
incorporated_at: Optional[datetime] = None
# Contact
headquarters: str
website: Optional[str] = None
phone: Optional[str] = None
email: Optional[str] = None
# Structure
subsidiaries: List[str] = Field(default_factory=list)
parent_company: Optional[str] = None
directors: List[str] = Field(default_factory=list)
officers: List[str] = Field(default_factory=list)
employees: int = 0
# Financial
annual_revenue: Optional[float] = None
revenue_currency: str = "USD"
revenue_year: int = 0
# Metadata
confidence: float = 0.0
sources: List[str] = Field(default_factory=list)
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
valid_from: datetime = Field(default_factory=datetime.now)
valid_until: Optional[datetime] = None
Defining Custom Relationships¶
Identity Resolution Relationship¶
# graphiti_osint/relationships/identity_resolution.py
from graphiti_core.edges import Edge
from datetime import datetime
from typing import List, Optional
from enum import Enum
class IdentityRelationType(str, Enum):
SAME_PERSON_AS = "SAME_PERSON_AS"
LIKELY_SAME_PERSON = "LIKELY_SAME_PERSON"
POSSIBLY_SAME_PERSON = "POSSIBLY_SAME_PERSON"
DIFFERENT_PERSON = "DIFFERENT_PERSON"
class IdentityResolutionEdge(Edge):
"""Custom edge for identity resolution"""
def __init__(
self,
source_node_uuid: str,
target_node_uuid: str,
relationship_type: IdentityRelationType,
confidence: float,
evidence: List[str],
valid_from: datetime,
valid_until: Optional[datetime] = None,
):
super().__init__(
source_node_uuid=source_node_uuid,
target_node_uuid=target_node_uuid,
)
self.relationship_type = relationship_type
self.confidence = confidence
self.evidence = evidence
self.valid_from = valid_from
self.valid_until = valid_until
def to_dict(self) -> dict:
"""Convert to dictionary for storage"""
return {
"source_uuid": self.source_node_uuid,
"target_uuid": self.target_node_uuid,
"relationship_type": self.relationship_type.value,
"confidence": self.confidence,
"evidence": self.evidence,
"valid_from": self.valid_from.isoformat(),
"valid_until": self.valid_until.isoformat() if self.valid_until else None,
}
@classmethod
def from_dict(cls, data: dict) -> "IdentityResolutionEdge":
"""Create from dictionary"""
return cls(
source_node_uuid=data["source_uuid"],
target_node_uuid=data["target_uuid"],
relationship_type=IdentityRelationType(data["relationship_type"]),
confidence=data["confidence"],
evidence=data["evidence"],
valid_from=datetime.fromisoformat(data["valid_from"]),
valid_until=datetime.fromisoformat(data["valid_until"]) if data.get("valid_until") else None,
)
Episode Processing¶
OSINT Episode Processor Base¶
# graphiti_osint/processors/osint_base.py
from abc import ABC, abstractmethod
from graphiti_core import Graphiti
from graphiti_core.observations import Episode
from typing import Dict, Any, List
from datetime import datetime
class OSINTEpisodeProcessor(ABC):
"""Base class for OSINT episode processors"""
def __init__(self, graphiti: Graphiti):
self.graphiti = graphiti
self.group_id = self.get_group_id()
@abstractmethod
def get_group_id(self) -> str:
"""Return the knowledge graph group ID"""
pass
@abstractmethod
async def process(self, episode_body: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""Process an episode and return results"""
pass
async def add_episode(
self,
name: str,
episode_body: str,
source_description: str,
metadata: Dict[str, Any] = None
) -> str:
"""Add episode to knowledge graph"""
episode = Episode(
name=name,
episode_body=episode_body,
source_description=source_description,
metadata=metadata or {}
)
# This triggers Graphiti's entity extraction
await self.graphiti.add_episode(
name=name,
episode_body=episode_body,
source=EpisodeType.text,
source_description=source_description
)
return episode.uuid
Username Recon Processor¶
# graphiti_osint/processors/username_recon.py
from .osint_base import OSINTEpisodeProcessor
from ..entities.person import PersonEntity
from ..relationships.identity_resolution import IdentityResolutionEdge, IdentityRelationType
from typing import Dict, Any
class UsernameReconProcessor(OSINTEpisodeProcessor):
"""Process username enumeration episodes"""
def get_group_id(self) -> str:
return "osint-usernames"
async def process(self, episode_body: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""Process username enumeration results"""
username = metadata.get("username")
results = metadata.get("results", {})
# Create person entity
person = PersonEntity(
uuid=f"person_{self._hash_string(username)}",
primary_name=results.get("name") or username,
given_name=results.get("first_name", ""),
family_name=results.get("last_name", ""),
usernames={
platform: account["username"]
for platform, account in results.get("accounts", {}).items()
},
emails=results.get("emails", []),
confidence=results.get("confidence", 70.0),
sources=results.get("sources", []),
created_at=datetime.now(),
valid_from=datetime.now()
)
# Add episode to knowledge graph
await self.add_episode(
name=f"username_recon_{username}",
episode_body=person.to_graphiti_episode(),
source_description="OSINT Username Enumeration"
)
# Link accounts to person
for platform, account_data in results.get("accounts", {}).items():
await self._link_account_to_person(
username,
platform,
account_data,
person.uuid
)
return {
"person_uuid": person.uuid,
"accounts_linked": len(results.get("accounts", {})),
"confidence": person.confidence
}
async def _link_account_to_person(
self,
username: str,
platform: str,
account_data: Dict[str, Any],
person_uuid: str
):
"""Create account entity and link to person"""
account_uuid = f"account_{platform}_{self._hash_string(username)}"
# Account episode
account_episode = f"""
Account on {platform}
Username: {username}
URL: {account_data.get('url')}
Display Name: {account_data.get('display_name')}
Followers: {account_data.get('followers', 0)}
Platform Verified: {account_data.get('verified', False)}
"""
await self.graphiti.add_episode(
name=f"account_{platform}_{username}",
episode_body=account_episode,
source_description=f"OSINT Account Data from {platform}"
)
# Create identity resolution relationship
edge = IdentityResolutionEdge(
source_node_uuid=account_uuid,
target_node_uuid=person_uuid,
relationship_type=IdentityRelationType.SAME_PERSON_AS,
confidence=account_data.get("confidence", 90.0),
evidence=[f"Profile on {platform}"],
valid_from=datetime.now()
)
# Store edge (implementation depends on Graphiti version)
# await self.graphiti.add_edge(edge)
def _hash_string(self, s: str) -> str:
"""Generate consistent hash for string"""
import hashlib
return hashlib.sha256(s.encode()).hexdigest()[:16]
Knowledge Graph Queries¶
Cypher Query Examples¶
// Find all identities for a person across platforms
MATCH (p:Person {primary_name: "John Smith"})
MATCH (p)-[:HAS_ACCOUNT]->(a:Account)
RETURN p, a.platform, a.username, a.url
ORDER BY a.followers_count DESC
// Identity resolution: Find likely same persons
MATCH (p1:Person)-[:LIKELY_SAME_PERSON]->(p2:Person)
WHERE p1.confidence > 70
RETURN p1, p2, p1.uuid as source
LIMIT 20
// Corporate structure exploration
MATCH (o:Organization {legal_name: "Acme Corp"})
OPTIONAL MATCH (o)<-[:PARENT_OF]-(sub:Organization)
OPTIONAL MATCH (o)<-[:FUNDED_BY]-(i:Investor)
OPTIONAL MATCH (o)<-[:DIRECTOR_OF]-(d:Person)
RETURN o,
collect(DISTINCT sub.legal_name) as subsidiaries,
collect(DISTINCT i.name) as investors,
collect(DISTINCT d.primary_name) as directors
// Temporal: What did we know about person X at a specific time?
MATCH (p:Person {uuid: "person_abc123"})
WHERE datetime(p.valid_from) <= datetime("2023-06-01")
AND (p.valid_until IS NULL OR datetime(p.valid_until) >= datetime("2023-06-01"))
RETURN p.primary_name, p.emails, p.employers
// Cross-platform identity correlation
MATCH (a1:Account {platform: "twitter"})-[r:LIKELY_SAME_PERSON]->(a2:Account {platform: "github"})
RETURN a1.username, a2.username, r.confidence, r.evidence
ORDER BY r.confidence DESC
// Investigation pivot tracking
MATCH (i:Investigation {id: "OSINT-INV-2026-001"})
MATCH (i)-[:GENERATED_PIVOT]->(pivot:Pivot)
MATCH (pivot)-[:DISCOVERED_IN]->(entity)
RETURN pivot.pivot_type, entity.uuid, entity.entity_type, pivot.priority
ORDER BY pivot.priority DESC
Integration with OSINT Workflows¶
Workflow Integration Pattern¶
# In OSINT workflow files, add knowledge graph integration
## Knowledge Graph Integration
After collecting OSINT findings, store to knowledge graph:
```python
from graphiti_osint import OSINTGraphConfig, UsernameReconProcessor
async def store_to_knowledge_graph(findings: dict):
"""Store OSINT findings to Graphiti knowledge graph"""
graphiti = OSINTGraphConfig.get_graphiti()
processor = UsernameReconProcessor(graphiti)
results = await processor.process(
episode_body=generate_episode_body(findings),
metadata={
"username": findings["username"],
"results": findings["accounts"]
}
)
return results
Workflow Example: UsernameRecon.md Enhancement¶
## Knowledge Graph Integration
**Store findings to Graphiti knowledge graph for:**
- Cross-investigation correlation
- Identity resolution across cases
- Temporal tracking of entity changes
- Relationship discovery and analysis
**Episode Structure:**
Target: {username} Scan Date: {timestamp}
Accounts Found: - Platform: {platform} Username: {username} URL: {url} Followers: {count} Verified: {bool}
Confirmed Accounts: {count} Confidence: {confidence}
Metadata: - Investigation: {investigation_id} - Depth: {depth} - Sources: {sources}
group_id: "osint-usernames"
Testing¶
Entity Tests¶
# tests/test_entities.py
import pytest
from graphiti_osint.entities.person import PersonEntity
from datetime import datetime
def test_person_entity_creation():
"""Test Person entity creation"""
person = PersonEntity(
uuid="person_test123",
primary_name="John Smith",
given_name="John",
family_name="Smith",
usernames={"twitter": "@jsmith", "github": "jsmith-dev"},
emails=["john@example.com"],
confidence=85.0,
sources=["Twitter", "GitHub"],
created_at=datetime.now(),
valid_from=datetime.now()
)
assert person.primary_name == "John Smith"
assert person.confidence == 85.0
assert len(person.usernames) == 2
assert "@jsmith" in person.usernames["twitter"]
def test_person_to_graphiti_episode():
"""Test conversion to Graphiti episode format"""
person = PersonEntity(
uuid="person_test123",
primary_name="John Smith",
given_name="John",
family_name="Smith",
usernames={"twitter": "@jsmith"},
confidence=85.0,
sources=["Twitter"],
created_at=datetime.now(),
valid_from=datetime.now()
)
episode = person.to_graphiti_episode()
assert "Person: John Smith" in episode
assert "twitter: @jsmith" in episode
assert "Confidence: 85.0" in episode
Troubleshooting¶
Common Issues¶
Issue: Entities not being extracted - Solution: Check episode format - ensure clear entity names and attributes
Issue: Relationships not being created - Solution: Verify both source and target entities exist before creating edges
Issue: Confidence scores not calculating - Solution: Check confidence factors are being passed correctly
Issue: Temporal queries returning incorrect results - Solution: Verify valid_from and valid_until timestamps are set correctly
Best Practices¶
- Always set confidence scores - Every entity and relationship should have a confidence value
- Track provenance - Always record where data came from
- Use temporal metadata - Set valid_from and valid_until for time-bound facts
- Group related entities - Use consistent group_id for related investigations
- Test with real data - Use actual OSINT findings to validate schema design
- Iterate on schema - Start with core entities, expand as needed
Version: 1.0.0 Last Updated: 2026-02-04 Related Docs: OSINT_ONTOLOGY.md, KNOWLEDGE_GROUPS.md