Skip to content

Need help to implement conversation summariser to reduce prompt token size. #3088

@pratham1906

Description

@pratham1906

pipecat version

0.0.91

Python version

3.12.8

Operating System

macOS

Question

We’re using GeminiLiveLLM Vertex service in our Pipecat bot, and we want to implement a conversation turn summarizer.
Whenever the prompt tokens go beyond a certain threshold, we’d like to automatically summarize the previous conversation and update/replace the context with that summary.

Is there an officially recommended way to handle context summarization with Pipecat + GeminiLiveLLM (Vertex), or any best practices for doing this?

What I've tried

I have tried using pipecat flows but it is not helping and throwing following error even though the deps are present

Make sure you have installed the required dependency:
- For OpenAI: pip install 'pipecat-ai[openai]'
- For Anthropic: pip install 'pipecat-ai[anthropic]'
- For Google: pip install 'pipecat-ai[google]'
- For Bedrock: pip install 'pipecat-ai[aws]'

does this pipecat flows also work with gemini vertex llm service ?

Context

Existing code Snippet for reference :

`
context = GeminiLiveContext(
[
{
"role": "user",
"content": SYSTEM_PROMPT,
}
]
)

context_aggregator = llm.create_context_aggregator(context)

# Create user idle handler with retry callback
async def handle_user_idle(processor: UserIdleProcessor, retry_count: int) -> bool:
    """Handle user idle with escalating prompts"""
    logger.info(f"User idle detected, retry count: {retry_count}")

    if retry_count == 1:
        user_instruction = "ask me if I am able to hear you"
        await processor.push_frame(
            LLMMessagesAppendFrame([{"role": "user", "content": user_instruction}], run_llm=True))
        return True  # Continue monitoring
    elif retry_count == 2:
        user_instruction = "ask me if I am still here"
        await processor.push_frame(
            LLMMessagesAppendFrame([{"role": "user", "content": user_instruction}], run_llm=True))
        return True  # Continue monitoring
    elif retry_count == 3:
        # Final attempt: speak the message.
        user_instruction = "Tell me that you are not able to hear me, and you are disconnecting the call and will call back again"
        await processor.push_frame(
            LLMMessagesAppendFrame([{"role": "user", "content": user_instruction}], run_llm=True))
        return True  # Continue monitoring to allow message to be spoken
    elif retry_count == 4:
        # Terminate the call after the final message has been spoken.
        await processor.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
        return False  # Stop monitoring
    else:
        logger.info(f"User idle after {retry_count} retries, stopping idle monitoring")
        return False
summarizer = ContextSummarizer(llm=llm, keep_last_messages=6)
# Create idle processor with 5 second timeout
user_idle = UserIdleProcessor(
    callback=handle_user_idle,
    timeout=5.0
)

pipeline_processors = [
    transport.input(),
    user_idle,  # Monitor user idle/activity
    context_aggregator.user(),
    summarizer,
    llm,  # LLM
]

if tts_service:
    pipeline_processors.append(tts_service)

pipeline_processors.extend([
    transport.output(),
    context_aggregator.assistant(),
])

pipeline = Pipeline(pipeline_processors)

task = PipelineTask(
    pipeline,
    params=PipelineParams(
        enable_metrics=True,
        enable_usage_metrics=True,
    ),
)

`

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions