Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
A powerful command-line interface for managing and interacting with Weaviate vector databases directly from your terminal.

## Key Features
- **Collections**: Create, update, delete and get collection configurations
- **Collections**: Create, batch, update, delete and get collection configurations
- **Data Management**: Import, query, update and delete data with various search types (vector, keyword, hybrid)
- **Multi-tenancy**: Manage tenants and their states across collections
- **Backup & Restore**: Create and restore backups with support for S3, GCS and filesystem
Expand Down Expand Up @@ -40,13 +40,17 @@ weaviate-cli create collection --collection movies --vectorizer transformers
# Import test data
weaviate-cli create data --collection movies --limit 1000

# Batch data import from local json file
weaviate-cli batch insert --collection <COL_NAME> --path <LOCAL_FILE_PATH.json> --vectorizer <VEC_NAME e.g. openai> --replication-factor 3

# Query data
weaviate-cli query data --collection movies --search-type hybrid --query "action movies"
```

## Core Commands

- **create**: Create collections, tenants, backups or import data
- **batch**: batch import a collection from a local file
- **delete**: Remove collections, tenants or data
- **update**: Modify collection settings, tenant states or data
- **get**: Retrieve collection info, tenant details or shard status
Expand Down
8 changes: 8 additions & 0 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
from weaviate_cli.commands.query import query
from weaviate_cli.commands.restore import restore
from weaviate_cli.commands.cancel import cancel
<<<<<<< HEAD
from weaviate_cli.commands.assign import assign
from weaviate_cli.commands.revoke import revoke
=======
from weaviate_cli.commands.batch import batch
>>>>>>> 19aebe7 (new feature - add batch import from a local json file)
from weaviate_cli import __version__


Expand Down Expand Up @@ -60,8 +64,12 @@ def main(ctx: click.Context, config_file: Optional[str], user: Optional[str]):
main.add_command(restore)
main.add_command(query)
main.add_command(cancel)
<<<<<<< HEAD
main.add_command(assign)
main.add_command(revoke)
=======
main.add_command(batch)
>>>>>>> 19aebe7 (new feature - add batch import from a local json file)

if __name__ == "__main__":
main()
103 changes: 103 additions & 0 deletions weaviate_cli/commands/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import click
import sys
import os
import json
from weaviate_cli.utils import get_client_from_context
from weaviate.exceptions import WeaviateConnectionError
from weaviate_cli.defaults import CreateCollectionDefaults
from weaviate_cli.managers.batch_manager import BatchManager


@click.group()
def batch() -> None:
"""Batch operations in Weaviate."""
pass


@batch.command("insert")
@click.option(
"--collection",
required=True,
help="The name of the collection (class) to insert data into.",
)
@click.option(
"--path",
required=True,
type=str,
help="Path to the JSON file containing the data.",
)
@click.option(
"--vectorizer",
default=CreateCollectionDefaults.vectorizer,
type=click.Choice(
["contextionary", "transformers", "openai", "ollama", "cohere", "jinaai"]
),
help="Vectorizer to use.",
)
@click.option(
"--shards",
default=1,
help="Number of shards for the collection (default: 1).",
)
@click.option(
"--replication-factor",
default=1,
help="Replication factor for the collection (default: 1).",
)
@click.pass_context
def batch_insert_cli(ctx, collection, path, vectorizer, shards, replication_factor):
"""
Insert data into a Weaviate collection (class) in batch mode.
"""
# Validate the file path and extension
if not os.path.isfile(path):
click.echo(f"Error: The file {path} does not exist.")
sys.exit(1)
if not path.endswith(".json"):
click.echo("Error: The file must have a .json extension.")
sys.exit(1)

# Load the JSON data
try:
with open(path, "r") as file:
data = json.load(file)
except json.JSONDecodeError:
click.echo(f"Error: The file {path} is not a valid JSON file.")
sys.exit(1)

# Validate JSON structure
if not isinstance(data, list) or not all(isinstance(obj, dict) for obj in data):
click.echo(
"Error: The JSON file must contain a list of objects (e.g., [{...}, {...}])."
)
sys.exit(1)

# Initialize the Weaviate client
client = None
try:
client = get_client_from_context(ctx)
batch_manager = BatchManager(client)

# Create the collection (if it doesn't exist)
click.echo(f"Ensuring collection '{collection}' exists...")
batch_manager.create_collection(
collection=collection,
vectorizer=vectorizer,
shards=shards,
replication_factor=replication_factor,
force_auto_schema=True,
)

# Insert the data in batch mode
click.echo(f"Inserting data into collection '{collection}'...")
batch_manager.batch_insert(collection, data)

except WeaviateConnectionError as wce:
click.echo(f"Connection error: {wce}")
sys.exit(1)
except Exception as e:
click.echo(f"Error: {e}")
sys.exit(1)
finally:
if client:
client.close()
114 changes: 114 additions & 0 deletions weaviate_cli/managers/batch_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import click
from typing import Dict, List, Optional
import weaviate.classes.config as wvc
from weaviate.client import WeaviateClient


class BatchManager:
def __init__(self, client: WeaviateClient) -> None:
self.client = client

def create_collection(
self,
collection: str,
vectorizer: str = "contextionary",
shards: int = 1,
replication_factor: int = 1,
force_auto_schema: bool = True,
) -> None:
"""
Create a collection dynamically for batch insertion.

Args:
collection (str): Name of the collection to create.
vectorizer (str): Vectorizer type (e.g., openai, transformers).
shards (int): Number of shards for the collection.
replication_factor (int): Replication factor for the collection.
force_auto_schema (bool): Whether to let Weaviate infer schema from inserted data.
"""
if self.client.collections.exists(collection):
click.echo(f"Collection '{collection}' already exists. Skipping creation.")
return

# Map vectorizers to Weaviate configurations
vectorizer_map: Dict[str, wvc.VectorizerConfig] = {
"contextionary": wvc.Configure.Vectorizer.text2vec_contextionary(),
"transformers": wvc.Configure.Vectorizer.text2vec_transformers(),
"openai": wvc.Configure.Vectorizer.text2vec_openai(),
"ollama": wvc.Configure.Vectorizer.text2vec_ollama(
model="snowflake-arctic-embed:33m"
),
"cohere": wvc.Configure.Vectorizer.text2vec_cohere(),
"jinaai": wvc.Configure.Vectorizer.text2vec_jinaai(),
}

# Validate vectorizer
if vectorizer not in vectorizer_map:
raise ValueError(
f"Invalid vectorizer '{vectorizer}'. Choose from: {list(vectorizer_map.keys())}"
)

try:
# Create collection with configuration
self.client.collections.create(
name=collection,
vector_index_config=wvc.Configure.VectorIndex.hnsw(),
replication_config=wvc.Configure.replication(
factor=replication_factor,
async_enabled=False,
deletion_strategy=wvc.ReplicationDeletionStrategy.DELETE_ON_CONFLICT,
),
sharding_config=wvc.Configure.sharding(desired_count=shards),
vectorizer_config=vectorizer_map[vectorizer],
properties=None if force_auto_schema else [],
)
click.echo(
f"Collection '{collection}' created successfully with vectorizer '{vectorizer}'."
)
except Exception as e:
raise Exception(f"Error creating collection '{collection}': {e}")

def batch_insert(
self,
collection: str,
data: List[Dict],
) -> None:
"""
Insert data into a collection in batch.

Args:
collection (str): Name of the collection.
data (List[Dict]): Data to be inserted.
"""
if not self.client.collections.exists(collection):
raise Exception(
f"Collection '{collection}' does not exist. Cannot insert data."
)

try:
# Perform batch insertion using Weaviate's dynamic batch
with self.client.batch.dynamic() as batch:
for record in data:
# Remove the reserved 'id' key, if present - to avoid Error message: WeaviateInsertInvalidPropertyError("It is forbidden to insert `id` or `vector`
if "id" in record:
record.pop("id")

# Add the object to the batch
batch.add_object(
collection=collection,
properties=record,
)
click.echo(
f"Processed record"
) # add '{record}' <- if you would like to see the record being processed
except Exception as e:
raise Exception(f"Batch insertion failed: {e}")

# Check for failed objects
failed_objects = self.client.batch.failed_objects
if failed_objects:
click.echo(f"Number of failed objects: {len(failed_objects)}")
for i, failed_obj in enumerate(failed_objects, 1):
click.echo(f"Failed object {i}: {failed_obj}")
else:
click.echo(f"All objects successfully inserted into '{collection}'.")
Loading