Skip to content

Commit 19aebe7

Browse files
Mohamed ShahinMohamed Shahin
authored andcommitted
new feature - add batch import from a local json file
1 parent 4f1d41c commit 19aebe7

File tree

4 files changed

+214
-1
lines changed

4 files changed

+214
-1
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
A powerful command-line interface for managing and interacting with Weaviate vector databases directly from your terminal.
99

1010
## Key Features
11-
- **Collections**: Create, update, delete and get collection configurations
11+
- **Collections**: Create, batch, update, delete and get collection configurations
1212
- **Data Management**: Import, query, update and delete data with various search types (vector, keyword, hybrid)
1313
- **Multi-tenancy**: Manage tenants and their states across collections
1414
- **Backup & Restore**: Create and restore backups with support for S3, GCS and filesystem
@@ -34,13 +34,17 @@ weaviate-cli create collection --collection movies --vectorizer transformers
3434
# Import test data
3535
weaviate-cli create data --collection movies --limit 1000
3636

37+
# Batch data import from local json file
38+
weaviate-cli batch insert --collection <COL_NAME> --path <LOCAL_FILE_PATH.json> --vectorizer <VEC_NAME e.g. openai> --replication-factor 3
39+
3740
# Query data
3841
weaviate-cli query data --collection movies --search-type hybrid --query "action movies"
3942
```
4043

4144
## Core Commands
4245

4346
- **create**: Create collections, tenants, backups or import data
47+
- **batch**: batch import a collection from a local file
4448
- **delete**: Remove collections, tenants or data
4549
- **update**: Modify collection settings, tenant states or data
4650
- **get**: Retrieve collection info, tenant details or shard status

cli.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from weaviate_cli.commands.query import query
99
from weaviate_cli.commands.restore import restore
1010
from weaviate_cli.commands.cancel import cancel
11+
from weaviate_cli.commands.batch import batch
1112
from weaviate_cli import __version__
1213

1314

@@ -51,6 +52,7 @@ def main(ctx: click.Context, config_file):
5152
main.add_command(restore)
5253
main.add_command(query)
5354
main.add_command(cancel)
55+
main.add_command(batch)
5456

5557
if __name__ == "__main__":
5658
main()

weaviate_cli/commands/batch.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import click
2+
import sys
3+
import os
4+
import json
5+
from weaviate_cli.utils import get_client_from_context
6+
from weaviate.exceptions import WeaviateConnectionError
7+
from weaviate_cli.defaults import CreateCollectionDefaults
8+
from weaviate_cli.managers.batch_manager import BatchManager
9+
10+
@click.group()
11+
def batch() -> None:
12+
"""Batch operations in Weaviate."""
13+
pass
14+
15+
@batch.command("insert")
16+
@click.option(
17+
"--collection",
18+
required=True,
19+
help="The name of the collection (class) to insert data into.",
20+
)
21+
@click.option(
22+
"--path",
23+
required=True,
24+
type=str,
25+
help="Path to the JSON file containing the data.",
26+
)
27+
@click.option(
28+
"--vectorizer",
29+
default=CreateCollectionDefaults.vectorizer,
30+
type=click.Choice(
31+
["contextionary", "transformers", "openai", "ollama", "cohere", "jinaai"]
32+
),
33+
help="Vectorizer to use.",
34+
)
35+
@click.option(
36+
"--shards",
37+
default=1,
38+
help="Number of shards for the collection (default: 1).",
39+
)
40+
@click.option(
41+
"--replication-factor",
42+
default=1,
43+
help="Replication factor for the collection (default: 1).",
44+
)
45+
@click.pass_context
46+
def batch_insert_cli(ctx, collection, path, vectorizer, shards, replication_factor):
47+
"""
48+
Insert data into a Weaviate collection (class) in batch mode.
49+
"""
50+
# Validate the file path and extension
51+
if not os.path.isfile(path):
52+
click.echo(f"Error: The file {path} does not exist.")
53+
sys.exit(1)
54+
if not path.endswith(".json"):
55+
click.echo("Error: The file must have a .json extension.")
56+
sys.exit(1)
57+
58+
# Load the JSON data
59+
try:
60+
with open(path, "r") as file:
61+
data = json.load(file)
62+
except json.JSONDecodeError:
63+
click.echo(f"Error: The file {path} is not a valid JSON file.")
64+
sys.exit(1)
65+
66+
# Validate JSON structure
67+
if not isinstance(data, list) or not all(isinstance(obj, dict) for obj in data):
68+
click.echo("Error: The JSON file must contain a list of objects (e.g., [{...}, {...}]).")
69+
sys.exit(1)
70+
71+
# Initialize the Weaviate client
72+
client = None
73+
try:
74+
client = get_client_from_context(ctx)
75+
batch_manager = BatchManager(client)
76+
77+
# Create the collection (if it doesn't exist)
78+
click.echo(f"Ensuring collection '{collection}' exists...")
79+
batch_manager.create_collection(
80+
collection=collection,
81+
vectorizer=vectorizer,
82+
shards=shards,
83+
replication_factor=replication_factor,
84+
force_auto_schema=True,
85+
)
86+
87+
# Insert the data in batch mode
88+
click.echo(f"Inserting data into collection '{collection}'...")
89+
batch_manager.batch_insert(collection, data)
90+
91+
except WeaviateConnectionError as wce:
92+
click.echo(f"Connection error: {wce}")
93+
sys.exit(1)
94+
except Exception as e:
95+
click.echo(f"Error: {e}")
96+
sys.exit(1)
97+
finally:
98+
if client:
99+
client.close()
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import click
2+
from typing import Dict, List, Optional
3+
import weaviate.classes.config as wvc
4+
from weaviate.client import WeaviateClient
5+
6+
7+
class BatchManager:
8+
def __init__(self, client: WeaviateClient) -> None:
9+
self.client = client
10+
11+
def create_collection(
12+
self,
13+
collection: str,
14+
vectorizer: str = "contextionary",
15+
shards: int = 1,
16+
replication_factor: int = 1,
17+
force_auto_schema: bool = True,
18+
) -> None:
19+
"""
20+
Create a collection dynamically for batch insertion.
21+
22+
Args:
23+
collection (str): Name of the collection to create.
24+
vectorizer (str): Vectorizer type (e.g., openai, transformers).
25+
shards (int): Number of shards for the collection.
26+
replication_factor (int): Replication factor for the collection.
27+
force_auto_schema (bool): Whether to let Weaviate infer schema from inserted data.
28+
"""
29+
if self.client.collections.exists(collection):
30+
click.echo(f"Collection '{collection}' already exists. Skipping creation.")
31+
return
32+
33+
# Map vectorizers to Weaviate configurations
34+
vectorizer_map: Dict[str, wvc.VectorizerConfig] = {
35+
"contextionary": wvc.Configure.Vectorizer.text2vec_contextionary(),
36+
"transformers": wvc.Configure.Vectorizer.text2vec_transformers(),
37+
"openai": wvc.Configure.Vectorizer.text2vec_openai(),
38+
"ollama": wvc.Configure.Vectorizer.text2vec_ollama(
39+
model="snowflake-arctic-embed:33m"
40+
),
41+
"cohere": wvc.Configure.Vectorizer.text2vec_cohere(),
42+
"jinaai": wvc.Configure.Vectorizer.text2vec_jinaai(),
43+
}
44+
45+
# Validate vectorizer
46+
if vectorizer not in vectorizer_map:
47+
raise ValueError(
48+
f"Invalid vectorizer '{vectorizer}'. Choose from: {list(vectorizer_map.keys())}"
49+
)
50+
51+
try:
52+
# Create collection with configuration
53+
self.client.collections.create(
54+
name=collection,
55+
vector_index_config=wvc.Configure.VectorIndex.hnsw(),
56+
replication_config=wvc.Configure.replication(
57+
factor=replication_factor,
58+
async_enabled=False,
59+
deletion_strategy=wvc.ReplicationDeletionStrategy.DELETE_ON_CONFLICT,
60+
),
61+
sharding_config=wvc.Configure.sharding(desired_count=shards),
62+
vectorizer_config=vectorizer_map[vectorizer],
63+
properties=None if force_auto_schema else [],
64+
)
65+
click.echo(f"Collection '{collection}' created successfully with vectorizer '{vectorizer}'.")
66+
except Exception as e:
67+
raise Exception(f"Error creating collection '{collection}': {e}")
68+
69+
def batch_insert(
70+
self,
71+
collection: str,
72+
data: List[Dict],
73+
) -> None:
74+
"""
75+
Insert data into a collection in batch.
76+
77+
Args:
78+
collection (str): Name of the collection.
79+
data (List[Dict]): Data to be inserted.
80+
"""
81+
if not self.client.collections.exists(collection):
82+
raise Exception(f"Collection '{collection}' does not exist. Cannot insert data.")
83+
84+
try:
85+
# Perform batch insertion using Weaviate's dynamic batch
86+
with self.client.batch.dynamic() as batch:
87+
for record in data:
88+
# Remove the reserved 'id' key, if present - to avoid Error message: WeaviateInsertInvalidPropertyError("It is forbidden to insert `id` or `vector`
89+
if "id" in record:
90+
record.pop("id")
91+
92+
# Add the object to the batch
93+
batch.add_object(
94+
collection=collection,
95+
properties=record,
96+
)
97+
click.echo(f"Processed record") # add '{record}' <- if you would like to see the record being processed
98+
except Exception as e:
99+
raise Exception(f"Batch insertion failed: {e}")
100+
101+
# Check for failed objects
102+
failed_objects = self.client.batch.failed_objects
103+
if failed_objects:
104+
click.echo(f"Number of failed objects: {len(failed_objects)}")
105+
for i, failed_obj in enumerate(failed_objects, 1):
106+
click.echo(f"Failed object {i}: {failed_obj}")
107+
else:
108+
click.echo(f"All objects successfully inserted into '{collection}'.")

0 commit comments

Comments
 (0)