Skip to content

Commit 1cb20b6

Browse files
authored
Input docs API parameter (#2034)
* Add optional input_documents to index API * Semver * Add input dataframe example notebook * Format * Fix docs and notebook
1 parent 2030f94 commit 1cb20b6

File tree

5 files changed

+221
-6
lines changed

5 files changed

+221
-6
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"type": "minor",
3+
"description": "Add optional input documents to indexing API."
4+
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"metadata": {},
7+
"outputs": [],
8+
"source": [
9+
"# Copyright (c) 2024 Microsoft Corporation.\n",
10+
"# Licensed under the MIT License."
11+
]
12+
},
13+
{
14+
"cell_type": "markdown",
15+
"metadata": {},
16+
"source": [
17+
"## Example of indexing from an existing in-memory dataframe\n",
18+
"\n",
19+
"Newer versions of GraphRAG let you submit a dataframe directly instead of running through the input processing step. This notebook demonstrates with regular or update runs.\n",
20+
"\n",
21+
"If performing an update, the assumption is that your dataframe contains only the new documents to add to the index."
22+
]
23+
},
24+
{
25+
"cell_type": "code",
26+
"execution_count": null,
27+
"metadata": {},
28+
"outputs": [],
29+
"source": [
30+
"from pathlib import Path\n",
31+
"from pprint import pprint\n",
32+
"\n",
33+
"import pandas as pd\n",
34+
"\n",
35+
"import graphrag.api as api\n",
36+
"from graphrag.config.load_config import load_config\n",
37+
"from graphrag.index.typing.pipeline_run_result import PipelineRunResult"
38+
]
39+
},
40+
{
41+
"cell_type": "code",
42+
"execution_count": null,
43+
"metadata": {},
44+
"outputs": [],
45+
"source": [
46+
"PROJECT_DIRECTORY = \"<your project directory>\"\n",
47+
"UPDATE = False\n",
48+
"FILENAME = \"new_documents.parquet\" if UPDATE else \"<original_documents>.parquet\"\n",
49+
"inputs = pd.read_parquet(f\"{PROJECT_DIRECTORY}/input/{FILENAME}\")\n",
50+
"# Only the bare minimum for input. These are the same fields that would be present after the load_input_documents workflow\n",
51+
"inputs = inputs.loc[:, [\"id\", \"title\", \"text\", \"creation_date\"]]"
52+
]
53+
},
54+
{
55+
"cell_type": "markdown",
56+
"metadata": {},
57+
"source": [
58+
"### Generate a `GraphRagConfig` object"
59+
]
60+
},
61+
{
62+
"cell_type": "code",
63+
"execution_count": null,
64+
"metadata": {},
65+
"outputs": [],
66+
"source": [
67+
"graphrag_config = load_config(Path(PROJECT_DIRECTORY))"
68+
]
69+
},
70+
{
71+
"cell_type": "markdown",
72+
"metadata": {},
73+
"source": [
74+
"## Indexing API\n",
75+
"\n",
76+
"*Indexing* is the process of ingesting raw text data and constructing a knowledge graph. GraphRAG currently supports plaintext (`.txt`) and `.csv` file formats."
77+
]
78+
},
79+
{
80+
"cell_type": "markdown",
81+
"metadata": {},
82+
"source": [
83+
"## Build an index"
84+
]
85+
},
86+
{
87+
"cell_type": "code",
88+
"execution_count": null,
89+
"metadata": {},
90+
"outputs": [],
91+
"source": [
92+
"index_result: list[PipelineRunResult] = await api.build_index(\n",
93+
" config=graphrag_config, input_documents=inputs, is_update_run=UPDATE\n",
94+
")\n",
95+
"\n",
96+
"# index_result is a list of workflows that make up the indexing pipeline that was run\n",
97+
"for workflow_result in index_result:\n",
98+
" status = f\"error\\n{workflow_result.errors}\" if workflow_result.errors else \"success\"\n",
99+
" print(f\"Workflow Name: {workflow_result.workflow}\\tStatus: {status}\")"
100+
]
101+
},
102+
{
103+
"cell_type": "markdown",
104+
"metadata": {},
105+
"source": []
106+
},
107+
{
108+
"cell_type": "markdown",
109+
"metadata": {},
110+
"source": [
111+
"## Query an index\n",
112+
"\n",
113+
"To query an index, several index files must first be read into memory and passed to the query API. "
114+
]
115+
},
116+
{
117+
"cell_type": "code",
118+
"execution_count": null,
119+
"metadata": {},
120+
"outputs": [],
121+
"source": [
122+
"entities = pd.read_parquet(f\"{PROJECT_DIRECTORY}/output/entities.parquet\")\n",
123+
"communities = pd.read_parquet(f\"{PROJECT_DIRECTORY}/output/communities.parquet\")\n",
124+
"community_reports = pd.read_parquet(\n",
125+
" f\"{PROJECT_DIRECTORY}/output/community_reports.parquet\"\n",
126+
")\n",
127+
"\n",
128+
"response, context = await api.global_search(\n",
129+
" config=graphrag_config,\n",
130+
" entities=entities,\n",
131+
" communities=communities,\n",
132+
" community_reports=community_reports,\n",
133+
" community_level=2,\n",
134+
" dynamic_community_selection=False,\n",
135+
" response_type=\"Multiple Paragraphs\",\n",
136+
" query=\"What are the top five themes of the dataset?\",\n",
137+
")"
138+
]
139+
},
140+
{
141+
"cell_type": "markdown",
142+
"metadata": {},
143+
"source": [
144+
"The response object is the official reponse from graphrag while the context object holds various metadata regarding the querying process used to obtain the final response."
145+
]
146+
},
147+
{
148+
"cell_type": "code",
149+
"execution_count": null,
150+
"metadata": {},
151+
"outputs": [],
152+
"source": [
153+
"print(response)"
154+
]
155+
},
156+
{
157+
"cell_type": "markdown",
158+
"metadata": {},
159+
"source": [
160+
"Digging into the context a bit more provides users with extremely granular information such as what sources of data (down to the level of text chunks) were ultimately retrieved and used as part of the context sent to the LLM model)."
161+
]
162+
},
163+
{
164+
"cell_type": "code",
165+
"execution_count": null,
166+
"metadata": {},
167+
"outputs": [],
168+
"source": [
169+
"pprint(context) # noqa: T203"
170+
]
171+
}
172+
],
173+
"metadata": {
174+
"kernelspec": {
175+
"display_name": "graphrag",
176+
"language": "python",
177+
"name": "python3"
178+
},
179+
"language_info": {
180+
"codemirror_mode": {
181+
"name": "ipython",
182+
"version": 3
183+
},
184+
"file_extension": ".py",
185+
"mimetype": "text/x-python",
186+
"name": "python",
187+
"nbconvert_exporter": "python",
188+
"pygments_lexer": "ipython3",
189+
"version": "3.12.10"
190+
}
191+
},
192+
"nbformat": 4,
193+
"nbformat_minor": 2
194+
}

graphrag/api/index.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@
1111
import logging
1212
from typing import Any
1313

14+
import pandas as pd
15+
1416
from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks
1517
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
1618
from graphrag.config.enums import IndexingMethod
1719
from graphrag.config.models.graph_rag_config import GraphRagConfig
1820
from graphrag.index.run.run_pipeline import run_pipeline
1921
from graphrag.index.run.utils import create_callback_chain
2022
from graphrag.index.typing.pipeline_run_result import PipelineRunResult
21-
from graphrag.index.typing.workflow import WorkflowFunction
2223
from graphrag.index.workflows.factory import PipelineFactory
2324
from graphrag.logger.standard_logging import init_loggers
2425

@@ -33,6 +34,7 @@ async def build_index(
3334
callbacks: list[WorkflowCallbacks] | None = None,
3435
additional_context: dict[str, Any] | None = None,
3536
verbose: bool = False,
37+
input_documents: pd.DataFrame | None = None,
3638
) -> list[PipelineRunResult]:
3739
"""Run the pipeline with the given configuration.
3840
@@ -48,6 +50,8 @@ async def build_index(
4850
A list of callbacks to register.
4951
additional_context : dict[str, Any] | None default=None
5052
Additional context to pass to the pipeline run. This can be accessed in the pipeline state under the 'additional_context' key.
53+
input_documents : pd.DataFrame | None default=None.
54+
Override document loading and parsing and supply your own dataframe of documents to index.
5155
5256
Returns
5357
-------
@@ -79,6 +83,7 @@ async def build_index(
7983
callbacks=workflow_callbacks,
8084
is_update_run=is_update_run,
8185
additional_context=additional_context,
86+
input_documents=input_documents,
8287
):
8388
outputs.append(output)
8489
if output.errors and len(output.errors) > 0:
@@ -91,11 +96,6 @@ async def build_index(
9196
return outputs
9297

9398

94-
def register_workflow_function(name: str, workflow: WorkflowFunction):
95-
"""Register a custom workflow function. You can then include the name in the settings.yaml workflows list."""
96-
PipelineFactory.register(name, workflow)
97-
98-
9999
def _get_method(method: IndexingMethod | str, is_update_run: bool) -> str:
100100
m = method.value if isinstance(method, IndexingMethod) else method
101101
return f"{m}-update" if is_update_run else m

graphrag/index/run/run_pipeline.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
from dataclasses import asdict
1212
from typing import Any
1313

14+
import pandas as pd
15+
1416
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
1517
from graphrag.config.models.graph_rag_config import GraphRagConfig
1618
from graphrag.index.run.utils import create_run_context
@@ -30,6 +32,7 @@ async def run_pipeline(
3032
callbacks: WorkflowCallbacks,
3133
is_update_run: bool = False,
3234
additional_context: dict[str, Any] | None = None,
35+
input_documents: pd.DataFrame | None = None,
3336
) -> AsyncIterable[PipelineRunResult]:
3437
"""Run all workflows using a simplified pipeline."""
3538
root_dir = config.root_dir
@@ -60,6 +63,11 @@ async def run_pipeline(
6063

6164
state["update_timestamp"] = update_timestamp
6265

66+
# if the user passes in a df directly, write directly to storage so we can skip finding/parsing later
67+
if input_documents is not None:
68+
await write_table_to_storage(input_documents, "documents", delta_storage)
69+
pipeline.remove("load_update_documents")
70+
6371
context = create_run_context(
6472
input_storage=input_storage,
6573
output_storage=delta_storage,
@@ -72,6 +80,11 @@ async def run_pipeline(
7280
else:
7381
logger.info("Running standard indexing.")
7482

83+
# if the user passes in a df directly, write directly to storage so we can skip finding/parsing later
84+
if input_documents is not None:
85+
await write_table_to_storage(input_documents, "documents", output_storage)
86+
pipeline.remove("load_input_documents")
87+
7588
context = create_run_context(
7689
input_storage=input_storage,
7790
output_storage=output_storage,

graphrag/index/typing/pipeline.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,7 @@ def run(self) -> Generator[Workflow]:
2121
def names(self) -> list[str]:
2222
"""Return the names of the workflows in the pipeline."""
2323
return [name for name, _ in self.workflows]
24+
25+
def remove(self, name: str) -> None:
26+
"""Remove a workflow from the pipeline by name."""
27+
self.workflows = [w for w in self.workflows if w[0] != name]

0 commit comments

Comments
 (0)