-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
pipecat version
0.0.91
Python version
3.12.8
Operating System
macOS
Question
We’re using GeminiLiveLLM Vertex service in our Pipecat bot, and we want to implement a conversation turn summarizer.
Whenever the prompt tokens go beyond a certain threshold, we’d like to automatically summarize the previous conversation and update/replace the context with that summary.
Is there an officially recommended way to handle context summarization with Pipecat + GeminiLiveLLM (Vertex), or any best practices for doing this?
What I've tried
I have tried using pipecat flows but it is not helping and throwing following error even though the deps are present
Make sure you have installed the required dependency:
- For OpenAI: pip install 'pipecat-ai[openai]'
- For Anthropic: pip install 'pipecat-ai[anthropic]'
- For Google: pip install 'pipecat-ai[google]'
- For Bedrock: pip install 'pipecat-ai[aws]'
does this pipecat flows also work with gemini vertex llm service ?
Context
Existing code Snippet for reference :
`
context = GeminiLiveContext(
[
{
"role": "user",
"content": SYSTEM_PROMPT,
}
]
)
context_aggregator = llm.create_context_aggregator(context)
# Create user idle handler with retry callback
async def handle_user_idle(processor: UserIdleProcessor, retry_count: int) -> bool:
"""Handle user idle with escalating prompts"""
logger.info(f"User idle detected, retry count: {retry_count}")
if retry_count == 1:
user_instruction = "ask me if I am able to hear you"
await processor.push_frame(
LLMMessagesAppendFrame([{"role": "user", "content": user_instruction}], run_llm=True))
return True # Continue monitoring
elif retry_count == 2:
user_instruction = "ask me if I am still here"
await processor.push_frame(
LLMMessagesAppendFrame([{"role": "user", "content": user_instruction}], run_llm=True))
return True # Continue monitoring
elif retry_count == 3:
# Final attempt: speak the message.
user_instruction = "Tell me that you are not able to hear me, and you are disconnecting the call and will call back again"
await processor.push_frame(
LLMMessagesAppendFrame([{"role": "user", "content": user_instruction}], run_llm=True))
return True # Continue monitoring to allow message to be spoken
elif retry_count == 4:
# Terminate the call after the final message has been spoken.
await processor.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
return False # Stop monitoring
else:
logger.info(f"User idle after {retry_count} retries, stopping idle monitoring")
return False
summarizer = ContextSummarizer(llm=llm, keep_last_messages=6)
# Create idle processor with 5 second timeout
user_idle = UserIdleProcessor(
callback=handle_user_idle,
timeout=5.0
)
pipeline_processors = [
transport.input(),
user_idle, # Monitor user idle/activity
context_aggregator.user(),
summarizer,
llm, # LLM
]
if tts_service:
pipeline_processors.append(tts_service)
pipeline_processors.extend([
transport.output(),
context_aggregator.assistant(),
])
pipeline = Pipeline(pipeline_processors)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
`