close
Skip to content

Commit 66cee1f

Browse files
committed
Implement Notion Correction Import feature
- Added a new service to read user corrections from Notion databases and apply them to the knowledge graph. - Introduced endpoints for initiating correction imports and checking their status, with detailed request and response models. - Implemented a multi-step pipeline for the correction process, including database discovery, row scanning, and applying corrections. - Enhanced observability with OpenTelemetry spans for each pipeline step. - Updated README.md to document the new Notion Correction Import feature and its usage. - Created an in-memory job store to track correction import progress.
1 parent 764a908 commit 66cee1f

File tree

8 files changed

+1451
-12
lines changed

8 files changed

+1451
-12
lines changed

‎README.md‎

Lines changed: 235 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
- [Technology Stack](#technology-stack)
1616
- [Observability in Axiom](#observability-in-axiom)
1717
- [Notion Export](#notion-export)
18+
- [Notion Correction Import](#notion-correction-import)
1819
- [Setup & Deployment](#setup--deployment)
1920

2021
---
@@ -70,10 +71,17 @@ The system is built on [Graphiti](https://github.com/getzep/graphiti), a tempora
7071
- **Dynamic Schema Design**: Gemini analyzes the graph and designs 3-10 category databases with optimal column schemas
7172
- **MCP Agent Integration**: Uses the Notion MCP server via `create_react_agent` for flexible row creation and page building
7273
- **Async with Polling**: Fire-and-forget pattern (202 + status polling) with step-level progress tracking
73-
- **Feedback Loop Columns**: Every database includes "Needs Review" (checkbox) and "Correction Notes" (rich_text) for future graph corrections
74+
- **Clean Page on Export**: Automatically removes all existing content under the parent page before each export
75+
- **Feedback Loop Columns**: Every database includes "Needs Review" (checkbox) and "Correction Notes" (rich_text) for corrections
7476
- **Per-Request Auth**: Notion token is passed per-request (not server-side) for multi-tenant use
7577

76-
### 7. 🔐 Security & Rate Limiting
78+
### 7. 🔄 Notion Correction Import
79+
- **Feedback Loop**: Reads user corrections from exported Notion databases and applies them back to the knowledge graph
80+
- **Smart Row Updates**: MCP agent intelligently updates affected columns or deletes rows that are no longer relevant
81+
- **Graphiti Integration**: Uses `add_episode()` with `custom_extraction_instructions` to ensure corrections are language-consistent and properly contradict outdated edges
82+
- **Partial Failure Handling**: Individual correction failures don't block the pipeline; detailed failure reports included in the result
83+
84+
### 8. 🔐 Security & Rate Limiting
7785
- **API Key Authentication**: All endpoints (except `/health`) require `X-API-SECRET` header
7886
- **Concurrency Control**: Configurable semaphore limits to avoid LLM rate limits (429 errors)
7987
- **CORS Middleware**: Supports cross-origin requests for web frontends
@@ -345,10 +353,11 @@ Instead of direct CRUD operations (which break embeddings), corrections are proc
345353
### 6. **Notion Export Service** (`app/services/notion_export.py`)
346354
**Purpose**: Export a user's knowledge graph into structured Notion databases
347355

348-
**Pipeline** (5 sequential steps, each with its own OTel span):
356+
**Pipeline** (6 sequential steps, each with its own OTel span):
349357

350358
| Step | Name | Method | Description |
351359
|------|------|--------|-------------|
360+
| 0 | Clean Page | Notion SDK (`blocks.children.list` + `DELETE`) | Remove all existing content under the parent page |
352361
| 1 | Hydrate | `HydrationService.build_user_knowledge(v1)` | Build the full graph compilation from Neo4j |
353362
| 2a | Analyze Schemas | Gemini structured output (`SchemaResult`) | Design 3-10 category databases with column schemas |
354363
| 2b | Extract Entries | Gemini structured output (`ExtractionResult`) | Extract rows for each category (one LLM call per category) |
@@ -360,6 +369,24 @@ Instead of direct CRUD operations (which break embeddings), corrections are proc
360369

361370
**Job Store** (`app/services/notion_export_job_store.py`): Tracks `current_step`, `categories_count`, `entries_count`, `database_ids`, and `summary_page_url`. Same in-memory pattern as the ingest job store.
362371

372+
### 7. **Notion Correction Service** (`app/services/notion_correction.py`)
373+
**Purpose**: Read user corrections from Notion and apply them back to the knowledge graph
374+
375+
**Pipeline** (3 steps, each with its own OTel span):
376+
377+
| Step | Name | Method | Description |
378+
|------|------|--------|-------------|
379+
| 0 | Discover Databases | Notion SDK (`blocks.children.list`) | Find all child databases under the parent page |
380+
| 1 | Scan Flagged Rows | Notion SDK (`databases/{id}/query`) | Query each database for rows with "Needs Review" checked |
381+
| 2a | Correct Graph | `graphiti.add_episode()` | Apply correction with `custom_extraction_instructions` for language + contradiction handling |
382+
| 2b | Update Notion Row | MCP agent (`create_react_agent`) | Intelligently update affected columns or delete the row if no longer relevant |
383+
384+
**Correction Strategy**: Each correction is processed as a new Graphiti episode. The `custom_extraction_instructions` steer the LLM to extract only corrected facts (not the old state), and to write everything in the user's preferred language. Graphiti's built-in contradiction detection automatically invalidates outdated edges.
385+
386+
**MCP Agent Decision**: The agent receives the full context (current properties, column schema, updated node summaries, new facts, invalidated facts) and chooses to either update the row or archive it.
387+
388+
**Job Store** (`app/services/notion_correction_job_store.py`): Tracks `current_step`, `databases_scanned`, `corrections_found`, `corrections_applied`, `corrections_failed`, and `failed_corrections` (per-row error details).
389+
363390
---
364391

365392
## API Endpoints
@@ -697,6 +724,85 @@ Pipeline steps reported via `currentStep`: `"hydrating"` → `"analyzing"` → `
697724

698725
---
699726

727+
#### 🔄 `POST /v1/notion/corrections`
728+
**Purpose**: Import user corrections from Notion databases back into the knowledge graph
729+
**Auth**: ✅ Required (`X-API-SECRET`)
730+
**Status**: `202 Accepted`
731+
**Request Body**:
732+
```json
733+
{
734+
"userId": "user-123",
735+
"notionToken": "ntn_your_token_here",
736+
"pageName": "Synapse",
737+
"language": "Spanish"
738+
}
739+
```
740+
741+
**Response**:
742+
```json
743+
{
744+
"jobId": "d1e2f3a4-...",
745+
"status": "processing",
746+
"pageId": "resolved-page-id"
747+
}
748+
```
749+
750+
**Client Flow**: Poll `GET /v1/notion/corrections/status/{jobId}` for progress and results.
751+
752+
---
753+
754+
#### 📊 `GET /v1/notion/corrections/status/{job_id}`
755+
**Purpose**: Poll for Notion correction import job status
756+
**Auth**: ✅ Required (`X-API-SECRET`)
757+
758+
**Response** (processing):
759+
```json
760+
{
761+
"jobId": "d1e2f3a4-...",
762+
"status": "processing",
763+
"progress": {
764+
"currentStep": "applying",
765+
"databasesScanned": 7,
766+
"correctionsFound": 3,
767+
"correctionsApplied": 1,
768+
"correctionsFailed": 0
769+
}
770+
}
771+
```
772+
773+
Pipeline steps reported via `currentStep`: `"scanning"``"applying"``"done"`.
774+
775+
**Response** (completed):
776+
```json
777+
{
778+
"jobId": "d1e2f3a4-...",
779+
"status": "completed",
780+
"result": {
781+
"correctionsFound": 3,
782+
"correctionsApplied": 2,
783+
"correctionsFailed": 1,
784+
"failedCorrections": [
785+
{"category": "Medications", "title": "Aspirin", "error": "LLM rate limit exceeded"}
786+
],
787+
"durationMs": 95000.5
788+
}
789+
}
790+
```
791+
792+
**Response** (failed):
793+
```json
794+
{
795+
"jobId": "d1e2f3a4-...",
796+
"status": "failed",
797+
"error": "No databases found under the specified Notion page.",
798+
"code": "NO_DATABASES"
799+
}
800+
```
801+
802+
**Note**: Terminal states (`"completed"` or `"failed"`) remove the job from memory after the response is returned.
803+
804+
---
805+
700806
## Notion Export
701807

702808
### Overview
@@ -717,6 +823,11 @@ The Notion Export feature lets you export a user's entire knowledge graph into s
717823
└─▶ Return 202 {jobId, pageId}
718824
719825
3. BACKGROUND PIPELINE (NotionExportService)
826+
827+
├─▶ Step 0: CLEAN PAGE
828+
│ └─▶ List all child blocks under the parent page
829+
│ └─▶ Delete each block (databases, summaries, etc.)
830+
│ → Ensures a fresh page for every export
720831
721832
├─▶ Step 1: HYDRATE
722833
│ └─▶ HydrationService.build_user_knowledge(userId, v1)
@@ -820,6 +931,126 @@ Per-step latency breakdown:
820931

821932
---
822933

934+
## Notion Correction Import
935+
936+
### Overview
937+
938+
The Notion Correction Import feature closes the feedback loop: users flag incorrect data in the exported Notion databases, and the system reads those corrections, applies them to the knowledge graph, and intelligently updates or deletes the Notion rows.
939+
940+
Each exported database includes two feedback columns:
941+
- **Needs Review** (checkbox): Flag a row that needs correction
942+
- **Correction Notes** (rich_text): Describe what needs to be fixed
943+
944+
### How It Works
945+
946+
```
947+
1. CLIENT
948+
└─▶ POST /v1/notion/corrections
949+
{userId, notionToken, pageName, language}
950+
951+
2. ROUTE HANDLER (synchronous validation)
952+
├─▶ Validate notionToken by resolving pageName → pageId
953+
├─▶ Create job in memory store (status: "processing")
954+
├─▶ Launch background task (asyncio.create_task)
955+
└─▶ Return 202 {jobId, pageId}
956+
957+
3. BACKGROUND PIPELINE (NotionCorrectionService)
958+
959+
├─▶ Step 0: DISCOVER DATABASES
960+
│ └─▶ List child_database blocks under the parent page
961+
│ → Map of category name → database ID
962+
963+
├─▶ Step 1: SCAN FOR FLAGGED ROWS
964+
│ └─▶ Query each database filtering "Needs Review" == true
965+
│ └─▶ Extract property values, types, and correction notes
966+
│ → List of CorrectionItems
967+
968+
└─▶ Step 2: APPLY CORRECTIONS (per row, sequentially)
969+
970+
├─▶ 2a. CORRECT GRAPH (Graphiti)
971+
│ └─▶ graphiti.add_episode() with:
972+
│ - Episode body containing old properties + user correction
973+
│ - custom_extraction_instructions for language + correction handling
974+
│ → Returns AddEpisodeResults (updated nodes, edges, invalidated facts)
975+
976+
└─▶ 2b. UPDATE NOTION ROW (MCP agent)
977+
└─▶ LangGraph ReAct agent with Notion MCP tools decides:
978+
- OPTION A: Update row (patch affected properties, uncheck flag)
979+
- OPTION B: Delete row (archive if entity is no longer relevant)
980+
Agent receives: current properties, column schema, node summaries,
981+
new facts, invalidated facts, and user correction notes.
982+
983+
4. CLIENT (polling loop)
984+
└─▶ GET /v1/notion/corrections/status/{jobId}
985+
├─▶ {status: "processing", progress: {currentStep, correctionsApplied, ...}}
986+
├─▶ {status: "completed", result: {correctionsFound, correctionsApplied, correctionsFailed, ...}}
987+
└─▶ {status: "failed", error, code}
988+
```
989+
990+
### Example: Full Correction Flow
991+
992+
```bash
993+
# 1. Start the correction import
994+
curl -X POST http://localhost:8000/v1/notion/corrections \
995+
-H "Content-Type: application/json" \
996+
-H "X-API-SECRET: your_secret" \
997+
-d '{
998+
"userId": "user-123",
999+
"notionToken": "ntn_your_token_here",
1000+
"pageName": "Synapse",
1001+
"language": "Spanish"
1002+
}'
1003+
# → 202 {"jobId": "def-456", "status": "processing", "pageId": "..."}
1004+
1005+
# 2. Poll for status
1006+
curl http://localhost:8000/v1/notion/corrections/status/def-456 \
1007+
-H "X-API-SECRET: your_secret"
1008+
# → {"status": "processing", "progress": {"currentStep": "applying", "correctionsApplied": 1, ...}}
1009+
1010+
# 3. Final result
1011+
# → {"status": "completed", "result": {"correctionsFound": 3, "correctionsApplied": 2, "correctionsFailed": 1, ...}}
1012+
```
1013+
1014+
### How the MCP Agent Decides
1015+
1016+
The agent receives the full context of each correction and makes an intelligent decision:
1017+
1018+
- **Update**: If the correction modifies specific facts (e.g., "the dosage changed to 20mg"), the agent patches the relevant columns while keeping unaffected properties intact
1019+
- **Delete**: If the correction invalidates the entity entirely (e.g., "this concept is no longer relevant"), the agent archives the row
1020+
1021+
The `language` parameter ensures all updated property values are written in the user's preferred language.
1022+
1023+
### Axiom Observability
1024+
1025+
The correction pipeline emits spans under the `correction.*` attribute namespace:
1026+
1027+
```apl
1028+
['synapse-cortex-traces']
1029+
| where startswith(name, 'notion_correction.')
1030+
| summarize
1031+
total = count(),
1032+
avg_duration = avg(['attributes.duration_ms']),
1033+
avg_found = avg(['attributes.correction.found']),
1034+
avg_applied = avg(['attributes.correction.applied']),
1035+
avg_failed = avg(['attributes.correction.failed'])
1036+
by bin_auto(_time)
1037+
```
1038+
1039+
Per-step breakdown (graph correction vs Notion row update):
1040+
1041+
```apl
1042+
['synapse-cortex-traces']
1043+
| where startswith(name, 'notion_correction.')
1044+
| summarize
1045+
avg_ms = avg(['attributes.duration_ms']),
1046+
p95_ms = percentile(['attributes.duration_ms'], 95),
1047+
calls = count()
1048+
by name
1049+
| order by avg_ms desc
1050+
```
1051+
1052+
---
1053+
8231054
## Data Flow
8241055

8251056
### Ingestion Pipeline (Async with Polling)
@@ -963,6 +1194,7 @@ The API now emits structured OpenTelemetry attributes for request-level and serv
9631194
- `ingest.*`: job lifecycle, counts, processing metadata
9641195
- `hydrate.*`: hydration request context and output size
9651196
- `export.*`: Notion export pipeline steps, categories, entries, database counts
1197+
- `correction.*`: Notion correction import pipeline, corrections found/applied/failed
9661198
- `graph.*`: graph retrieval/correction context and result counts
9671199
- `db.*`: Neo4j query type, records returned, query latency
9681200
- `upstream.*`: upstream status/error hints for Gemini and HTTP calls

‎app/api/dependencies.py‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from app.services.graph import GraphService
1414
from app.services.hydration import HydrationService
1515
from app.services.ingestion import IngestionService
16+
from app.services.notion_correction import NotionCorrectionService
1617
from app.services.notion_export import NotionExportService
1718

1819

@@ -50,10 +51,16 @@ def get_notion_export_service(request: Request) -> NotionExportService:
5051
return request.app.state.notion_export_service
5152

5253

54+
def get_notion_correction_service(request: Request) -> NotionCorrectionService:
55+
"""Get the Notion correction import service from app state."""
56+
return request.app.state.notion_correction_service
57+
58+
5359
# Type aliases for service dependencies
5460
HydrationServiceDep = Annotated[HydrationService, Depends(get_hydration_service)]
5561
IngestionServiceDep = Annotated[IngestionService, Depends(get_ingestion_service)]
5662
GenerationServiceDep = Annotated[GenerationService, Depends(get_generation_service)]
5763
GraphServiceDep = Annotated[GraphService, Depends(get_graph_service)]
5864
GraphitiDep = Annotated[Graphiti, Depends(get_graphiti)]
5965
NotionExportServiceDep = Annotated[NotionExportService, Depends(get_notion_export_service)]
66+
NotionCorrectionServiceDep = Annotated[NotionCorrectionService, Depends(get_notion_correction_service)]

0 commit comments

Comments
 (0)