from fastapi import FastAPI
from dotenv import load_dotenv
from livekit.agents import (
AutoSubscribe,
JobContext,
JobProcess,
WorkerOptions,
WorkerType,
cli,
llm,
metrics
)
from livekit import api
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import openai, deepgram, silero, cartesia
import os
import asyncio
import sys
import logging
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = FastAPI()
async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append(
role="system",
text="You are a voice assistant created by Cerebrium. Your interface with users will be voice. You should use short and concise responses, and avoiding usage of unpronouncable punctuation.",
)
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
agent = VoicePipelineAgent(
vad=silero.VAD.load(),
# flexibility to use any models
stt=deepgram.STT(model="nova-2-general"),
llm=openai.LLM(
model="gpt-4o-mini",
temperature=0.5,
),
tts=cartesia.TTS(),
# intial ChatContext with system prompt
chat_ctx=initial_ctx,
# whether the agent can be interrupted
allow_interruptions=True,
# sensitivity of when to interrupt
interrupt_speech_duration=0.5,
interrupt_min_words=0,
# minimal silence duration to consider end of turn
min_endpointing_delay=0.3,
fnc_ctx=fnc_ctx
)
usage_collector = metrics.UsageCollector()
@agent.on("metrics_collected")
def _on_metrics_collected(mtrcs: metrics.AgentMetrics):
metrics.log_metrics(mtrcs)
usage_collector.collect(mtrcs)
async def log_usage():
summary = usage_collector.get_summary()
print(f"Usage: ${summary}")
ctx.add_shutdown_callback(log_usage)
agent.start(ctx.room)
await asyncio.sleep(1.2)
await agent.say("Hey, how can I help you today?", allow_interruptions=True)
if __name__ == '__main__':
if len(sys.argv) == 1:
sys.argv.append('start')
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, worker_type=WorkerType.ROOM, port=8600))