0
点赞
收藏
分享

微信扫一扫

从0到1教程 | 使用 Python 通过 GPT-4o 实时 API 构建语音机器人

语音技术正在改变我们与机器的互动方式,使与人工智能的对话比以往任何时候都更加自然。


随着由 GPT-4o 提供支持的 Realtime API 公开测试版的发布,开发者现在拥有了在其应用中创建低延迟、多模式语音体验的工具,为创新开辟了无限可能。

构建语音机器人需要拼凑多个模型进行转录、推理和文本到语音转换的日子已经一去不复返了。


借助 Realtime API,开发人员现在可以通过单个 API 调用简化整个流程,实现流畅、自然的语音到语音对话。这对于客户支持、教育和实时语言翻译等行业来说是一个颠覆性的变化,因为快速、无缝的交互至关重要。



从0到1教程 | 使用 Python 通过 GPT-4o 实时 API 构建语音机器人_API

下面将手把手指导你使用 GPT-4o 实时模型从头开始构建您的第一个实时语音机器人!

我们将介绍 Realtime API 的主要功能、如何设置用于语音流的 WebSocket 连接以及如何利用 API 处理中断和进行函数调用的能力。

最后,您将准备好创建一个能够以接近人类的准确度和情感响应用户的语音机器人。准备好了吗?让我们开始吧!

主要特点

  • 低延迟流媒体: 支持实时音频输入和输出,实现自然无缝的对话。
  • 多模式支持: 处理文本和音频的输入和输出,允许多种交互模式。
  • 预设声音: 支持六种预定义声音,确保响应的质量和一致性。
  • 功能调用: 允许语音助手动态执行操作或检索特定于上下文的信息。
  • 安全和隐私: 包含多层安全保护,包括自动监控和遵守隐私政策。

GPT-4o 实时 API 的工作原理

传统上,构建语音助手需要将多个模型链接在一起:用于转录音频的自动语音识别 (ASR) 模型(如 Whisper)、用于处理响应的基于文本的模型以及用于生成音频输出的文本转语音 (TTS) 模型。这种多步骤的过程通常会导致延迟和情感细微差别的丧失。


GPT-4o Realtime API 通过将这些功能整合到单个 API 调用中,彻底改变了这一现状。通过建立持久的 WebSocket 连接,开发人员可以直接传输音频输入和输出,从而显著减少延迟并增强对话的自然性。此外,API 的函数调用功能允许语音机器人执行诸如下订单或动态检索客户信息之类的操作。



从0到1教程 | 使用 Python 通过 GPT-4o 实时 API 构建语音机器人_API_02


构建实时语音机器人

让我们深入了解使用 GPT-4o Realtime API 构建自己的实时语音机器人的分步过程。

先决条件

开始之前,请确保您已准备好以下物品:

  • Azure 订阅: 通过微软合作伙伴注册企业账户。

PS:OpenAI 已于今年封锁了中国地区API,但微软Azure OpenAI服务仍可以合规、稳定地提供企业用户使用ChatGPT的可能。出于合规角度,国内企业可以选择微软的Azure OpenAI服务来使用接口。

参考链接: 微软 Azure OpenAI 企业账户接口申请

从0到1教程 | 使用 Python 通过 GPT-4o 实时 API 构建语音机器人_Real_03

  • Azure OpenAI 资源: 在受支持的区域(美国东部 2 或瑞典中部)设置。
  • 开发环境: 熟悉Python和基本的异步编程。
  • 客户端库: LiveKit、Agora 或 Twilio 等工具可以增强您的机器人的功能。

设置 API

  1. 部署 GPT-4o 实时模型:
  • 导航到 Azure AI Studio。
  • 访问模型目录并搜索 gpt-4o-realtime-preview。
  • 通过选择 Azure OpenAI 资源并配置部署设置来部署模型。
  • 配置音频输入和输出:
  • 该 API 支持各种音频格式,主要是 pcm16。
  • 设置您的客户端来处理音频流,确保与 API 的要求兼容。

该项目演示了如何使用 Azure OpenAI 构建复杂的实时对话式 AI 系统。通过利用 WebSocket 连接和事件驱动架构,该系统以任何语言提供响应迅速且可感知上下文的客户支持。这种方法可以适应各种语言和用例,使其成为希望增强客户服务能力的企业的多功能解决方案。 该项目由三个主要部分组成:

  • 实时 API:处理与 Azure OpenAI 实时 API 的 WebSocket 连接。
  • 工具:定义各种客户支持功能,如检查订单状态、处理退货等。
  • 应用程序:管理交互流并将实时客户端与UI层集成。

环境设置

创建一个 .env 文件并更新以下环境变量:

AZURE_OPENAI_API_KEY=XXXX
# replace with your Azure OpenAI API Key

AZURE_OPENAI_ENDPOINT=https://xxxx.openai.azure.com/
# replace with your Azure OpenAI Endpoint

AZURE_OPENAI_DEPLOYMENT=gpt-4o-realtime-preview
#Create a deployment for the gpt-4o-realtime-preview model and place the deployment name here. You can name the deployment as per your choice and put the name here.

AZURE_OPENAI_CHAT_DEPLOYMENT_VERSION=2024-10-01-preview
#You don't need to change this unless you are willing to try other versions.

要求.txt

chainlit==1.3.0rc1
openai
beautifulsoup4
lxml
python-dotenv
websockets
aiohttp

实现实时客户端

语音机器人的核心是 Realtime Client,它管理 WebSocket 连接并处理与 GPT-4o Realtime API 的通信。RealtimeAPI 类负责 管理与 OpenAI 实时 API 的 WebSocket 连接。它负责 发送 和接收消息、调度事件以及维护连接状态。

主要特点包括:

  • 连接管理:建立并维护 WebSocket 连接。
  • 事件分派:使用事件驱动架构来处理传入和传出的消息。
  • 音频处理:使用实用函数将音频输入从 base64 转换为数组缓冲区,反之亦然。有效管理音频流,确保最小延迟和高质量语音交互。

关键组件:

  • RealtimeAPI 类:
  • 建立并维护 WebSocket 连接。
  • 处理发送和接收消息。
  • 管理各种对话事件的事件调度。

class RealtimeAPI(RealtimeEventHandler):
    def __init__(self):
        super().__init__()
        self.default_url = 'wss://api.openai.com/v1/realtime'
        self.url = os.environ["AZURE_OPENAI_ENDPOINT"]
        self.api_key = os.environ["AZURE_OPENAI_API_KEY"]
        self.api_version = "2024-10-01-preview"
        self.azure_deployment = os.environ["AZURE_OPENAI_DEPLOYMENT"]
        self.ws = None

    def is_connected(self):
        return self.ws is not None

    def log(self, *args):
        logger.debug(f"[Websocket/{datetime.utcnow().isoformat()}]", *args)

    async def connect(self, model='gpt-4o-realtime-preview'):
        if self.is_connected():
            raise Exception("Already connected")
        self.ws = await websockets.connect(f"{self.url}/openai/realtime?api-version={self.api_version}&deployment={model}&api-key={self.api_key}", extra_headers={
            'Authorization': f'Bearer {self.api_key}',
            'OpenAI-Beta': 'realtime=v1'
        })
        self.log(f"Connected to {self.url}")
        asyncio.create_task(self._receive_messages())

    async def _receive_messages(self):
        async for message in self.ws:
            event = json.loads(message)
            if event['type'] == "error":
                logger.error("ERROR", message)
            self.log("received:", event)
            self.dispatch(f"server.{event['type']}", event)
            self.dispatch("server.*", event)

    async def send(self, event_name, data=None):
        if not self.is_connected():
            raise Exception("RealtimeAPI is not connected")
        data = data or {}
        if not isinstance(data, dict):
            raise Exception("data must be a dictionary")
        event = {
            "event_id": self._generate_id("evt_"),
            "type": event_name,
            **data
        }
        self.dispatch(f"client.{event_name}", event)
        self.dispatch("client.*", event)
        self.log("sent:", event)
        await self.ws.send(json.dumps(event))

    def _generate_id(self, prefix):
        return f"{prefix}{int(datetime.utcnow().timestamp() * 1000)}"

    async def disconnect(self):
        if self.ws:
            await self.ws.close()
            self.ws = None
            self.log(f"Disconnected from {self.url}")

参考: init.py

  • 实时对话类:
  • 管理对话的状态。
  • 处理不同类型的事件,例如消息创建、转录完成和音频流。
  • 对音频和文本数据进行排队和格式化,以实现无缝交互。

class RealtimeConversation:
    default_frequency = config.features.audio.sample_rate
    
    EventProcessors = {
        'conversation.item.created': lambda self, event: self._process_item_created(event),
        'conversation.item.truncated': lambda self, event: self._process_item_truncated(event),
        'conversation.item.deleted': lambda self, event: self._process_item_deleted(event),
        'conversation.item.input_audio_transcription.completed': lambda self, event: self._process_input_audio_transcription_completed(event),
        'input_audio_buffer.speech_started': lambda self, event: self._process_speech_started(event),
        'input_audio_buffer.speech_stopped': lambda self, event, input_audio_buffer: self._process_speech_stopped(event, input_audio_buffer),
        'response.created': lambda self, event: self._process_response_created(event),
        'response.output_item.added': lambda self, event: self._process_output_item_added(event),
        'response.output_item.done': lambda self, event: self._process_output_item_done(event),
        'response.content_part.added': lambda self, event: self._process_content_part_added(event),
        'response.audio_transcript.delta': lambda self, event: self._process_audio_transcript_delta(event),
        'response.audio.delta': lambda self, event: self._process_audio_delta(event),
        'response.text.delta': lambda self, event: self._process_text_delta(event),
        'response.function_call_arguments.delta': lambda self, event: self._process_function_call_arguments_delta(event),
    }
    
    def __init__(self):
        self.clear()

    def clear(self):
        self.item_lookup = {}
        self.items = []
        self.response_lookup = {}
        self.responses = []
        self.queued_speech_items = {}
        self.queued_transcript_items = {}
        self.queued_input_audio = None

    def queue_input_audio(self, input_audio):
        self.queued_input_audio = input_audio

    def process_event(self, event, *args):
        event_processor = self.EventProcessors.get(event['type'])
        if not event_processor:
            raise Exception(f"Missing conversation event processor for {event['type']}")
        return event_processor(self, event, *args)

    def get_item(self, id):
        return self.item_lookup.get(id)

    def get_items(self):
        return self.items[:]

    def _process_item_created(self, event):
        item = event['item']
        new_item = item.copy()
        if new_item['id'] not in self.item_lookup:
            self.item_lookup[new_item['id']] = new_item
            self.items.append(new_item)
        new_item['formatted'] = {
            'audio': [],
            'text': '',
            'transcript': ''
        }
        if new_item['id'] in self.queued_speech_items:
            new_item['formatted']['audio'] = self.queued_speech_items[new_item['id']]['audio']
            del self.queued_speech_items[new_item['id']]
        if 'content' in new_item:
            text_content = [c for c in new_item['content'] if c['type'] in ['text', 'input_text']]
            for content in text_content:
                new_item['formatted']['text'] += content['text']
        if new_item['id'] in self.queued_transcript_items:
            new_item['formatted']['transcript'] = self.queued_transcript_items[new_item['id']]['transcript']
            del self.queued_transcript_items[new_item['id']]
        if new_item['type'] == 'message':
            if new_item['role'] == 'user':
                new_item['status'] = 'completed'
                if self.queued_input_audio:
                    new_item['formatted']['audio'] = self.queued_input_audio
                    self.queued_input_audio = None
            else:
                new_item['status'] = 'in_progress'
        elif new_item['type'] == 'function_call':
            new_item['formatted']['tool'] = {
                'type': 'function',
                'name': new_item['name'],
                'call_id': new_item['call_id'],
                'arguments': ''
            }
            new_item['status'] = 'in_progress'
        elif new_item['type'] == 'function_call_output':
            new_item['status'] = 'completed'
            new_item['formatted']['output'] = new_item['output']
        return new_item, None

    def _process_item_truncated(self, event):
        item_id = event['item_id']
        audio_end_ms = event['audio_end_ms']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'item.truncated: Item "{item_id}" not found')
        end_index = (audio_end_ms * self.default_frequency) // 1000
        item['formatted']['transcript'] = ''
        item['formatted']['audio'] = item['formatted']['audio'][:end_index]
        return item, None

    def _process_item_deleted(self, event):
        item_id = event['item_id']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'item.deleted: Item "{item_id}" not found')
        del self.item_lookup[item['id']]
        self.items.remove(item)
        return item, None

    def _process_input_audio_transcription_completed(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        transcript = event['transcript']
        formatted_transcript = transcript or ' '
        item = self.item_lookup.get(item_id)
        if not item:
            self.queued_transcript_items[item_id] = {'transcript': formatted_transcript}
            return None, None
        item['content'][content_index]['transcript'] = transcript
        item['formatted']['transcript'] = formatted_transcript
        return item, {'transcript': transcript}

    def _process_speech_started(self, event):
        item_id = event['item_id']
        audio_start_ms = event['audio_start_ms']
        self.queued_speech_items[item_id] = {'audio_start_ms': audio_start_ms}
        return None, None

    def _process_speech_stopped(self, event, input_audio_buffer):
        item_id = event['item_id']
        audio_end_ms = event['audio_end_ms']
        speech = self.queued_speech_items[item_id]
        speech['audio_end_ms'] = audio_end_ms
        if input_audio_buffer:
            start_index = (speech['audio_start_ms'] * self.default_frequency) // 1000
            end_index = (speech['audio_end_ms'] * self.default_frequency) // 1000
            speech['audio'] = input_audio_buffer[start_index:end_index]
        return None, None

    def _process_response_created(self, event):
        response = event['response']
        if response['id'] not in self.response_lookup:
            self.response_lookup[response['id']] = response
            self.responses.append(response)
        return None, None

    def _process_output_item_added(self, event):
        response_id = event['response_id']
        item = event['item']
        response = self.response_lookup.get(response_id)
        if not response:
            raise Exception(f'response.output_item.added: Response "{response_id}" not found')
        response['output'].append(item['id'])
        return None, None

    def _process_output_item_done(self, event):
        item = event['item']
        if not item:
            raise Exception('response.output_item.done: Missing "item"')
        found_item = self.item_lookup.get(item['id'])
        if not found_item:
            raise Exception(f'response.output_item.done: Item "{item["id"]}" not found')
        found_item['status'] = item['status']
        return found_item, None

    def _process_content_part_added(self, event):
        item_id = event['item_id']
        part = event['part']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.content_part.added: Item "{item_id}" not found')
        item['content'].append(part)
        return item, None

    def _process_audio_transcript_delta(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.audio_transcript.delta: Item "{item_id}" not found')
        item['content'][content_index]['transcript'] += delta
        item['formatted']['transcript'] += delta
        return item, {'transcript': delta}

    def _process_audio_delta(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            logger.debug(f'response.audio.delta: Item "{item_id}" not found')
            return None, None
        array_buffer = base64_to_array_buffer(delta)
        append_values = array_buffer.tobytes()
        # TODO: make it work
        # item['formatted']['audio'] = merge_int16_arrays(item['formatted']['audio'], append_values)
        return item, {'audio': append_values}

    def _process_text_delta(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.text.delta: Item "{item_id}" not found')
        item['content'][content_index]['text'] += delta
        item['formatted']['text'] += delta
        return item, {'text': delta}

    def _process_function_call_arguments_delta(self, event):
        item_id = event['item_id']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.function_call_arguments.delta: Item "{item_id}" not found')
        item['arguments'] += delta
        item['formatted']['tool']['arguments'] += delta
        return item, {'arguments': delta}

  • RealtimeClient 类:
  • 初始化:设置系统提示、会话配置,并初始化 RealtimeAPI 和 RealtimeConversation 以管理 WebSocket 连接和对话事件。
  • 连接管理:处理与服务器的连接和断开连接、等待会话创建以及更新会话设置。
  • 事件处理:监听并处理服务器和客户端事件,并将它们分派给适当的处理程序。
    对话管理:管理对话项目的创建、更新和删除,包括处理输入音频和语音事件。
  • 工具和响应管理:支持添加/删除工具、根据事件调用工具、发送用户消息、创建响应以及管理音频内容。

class RealtimeClient(RealtimeEventHandler):
    def __init__(self, system_prompt: str):
        super().__init__()
        self.system_prompt = system_prompt
        self.default_session_config = {
            "modalities": ["text", "audio"],
            "instructions": self.system_prompt,
            "voice": "shimmer",
            "input_audio_format": "pcm16",
            "output_audio_format": "pcm16",
            "input_audio_transcription": { "model": 'whisper-1' },
            "turn_detection": { "type": 'server_vad' },
            "tools": [],
            "tool_choice": "auto",
            "temperature": 0.8,
            "max_response_output_tokens": 4096,
        }
        self.session_config = {}
        self.transcription_models = [{"model": "whisper-1"}]
        self.default_server_vad_config = {
            "type": "server_vad",
            "threshold": 0.5,
            "prefix_padding_ms": 300,
            "silence_duration_ms": 200,
        }
        self.realtime = RealtimeAPI()
        self.conversation = RealtimeConversation()
        self._reset_config()
        self._add_api_event_handlers()
        
    def _reset_config(self):
        self.session_created = False
        self.tools = {}
        self.session_config = self.default_session_config.copy()
        self.input_audio_buffer = bytearray()
        return True

    def _add_api_event_handlers(self):
        self.realtime.on("client.*", self._log_event)
        self.realtime.on("server.*", self._log_event)
        self.realtime.on("server.session.created", self._on_session_created)
        self.realtime.on("server.response.created", self._process_event)
        self.realtime.on("server.response.output_item.added", self._process_event)
        self.realtime.on("server.response.content_part.added", self._process_event)
        self.realtime.on("server.input_audio_buffer.speech_started", self._on_speech_started)
        self.realtime.on("server.input_audio_buffer.speech_stopped", self._on_speech_stopped)
        self.realtime.on("server.conversation.item.created", self._on_item_created)
        self.realtime.on("server.conversation.item.truncated", self._process_event)
        self.realtime.on("server.conversation.item.deleted", self._process_event)
        self.realtime.on("server.conversation.item.input_audio_transcription.completed", self._process_event)
        self.realtime.on("server.response.audio_transcript.delta", self._process_event)
        self.realtime.on("server.response.audio.delta", self._process_event)
        self.realtime.on("server.response.text.delta", self._process_event)
        self.realtime.on("server.response.function_call_arguments.delta", self._process_event)
        self.realtime.on("server.response.output_item.done", self._on_output_item_done)

    def _log_event(self, event):
        realtime_event = {
            "time": datetime.utcnow().isoformat(),
            "source": "client" if event["type"].startswith("client.") else "server",
            "event": event,
        }
        self.dispatch("realtime.event", realtime_event)

    def _on_session_created(self, event):
        self.session_created = True

    def _process_event(self, event, *args):
        item, delta = self.conversation.process_event(event, *args)
        if item:
            self.dispatch("conversation.updated", {"item": item, "delta": delta})
        return item, delta

    def _on_speech_started(self, event):
        self._process_event(event)
        self.dispatch("conversation.interrupted", event)

    def _on_speech_stopped(self, event):
        self._process_event(event, self.input_audio_buffer)

    def _on_item_created(self, event):
        item, delta = self._process_event(event)
        self.dispatch("conversation.item.appended", {"item": item})
        if item and item["status"] == "completed":
            self.dispatch("conversation.item.completed", {"item": item})

    async def _on_output_item_done(self, event):
        item, delta = self._process_event(event)
        if item and item["status"] == "completed":
            self.dispatch("conversation.item.completed", {"item": item})
        if item and item.get("formatted", {}).get("tool"):
            await self._call_tool(item["formatted"]["tool"])

    async def _call_tool(self, tool):
        try:
            print(tool["arguments"])
            json_arguments = json.loads(tool["arguments"])
            tool_config = self.tools.get(tool["name"])
            if not tool_config:
                raise Exception(f'Tool "{tool["name"]}" has not been added')
            result = await tool_config["handler"](**json_arguments)
            await self.realtime.send("conversation.item.create", {
                "item": {
                    "type": "function_call_output",
                    "call_id": tool["call_id"],
                    "output": json.dumps(result),
                }
            })
        except Exception as e:
            logger.error(traceback.format_exc())
            await self.realtime.send("conversation.item.create", {
                "item": {
                    "type": "function_call_output",
                    "call_id": tool["call_id"],
                    "output": json.dumps({"error": str(e)}),
                }
            })
        await self.create_response()

    def is_connected(self):
        return self.realtime.is_connected()

    def reset(self):
        self.disconnect()
        self.realtime.clear_event_handlers()
        self._reset_config()
        self._add_api_event_handlers()
        return True

    async def connect(self):
        if self.is_connected():
            raise Exception("Already connected, use .disconnect() first")
        await self.realtime.connect()
        await self.update_session()
        return True

    async def wait_for_session_created(self):
        if not self.is_connected():
            raise Exception("Not connected, use .connect() first")
        while not self.session_created:
            await asyncio.sleep(0.001)
        return True

    async def disconnect(self):
        self.session_created = False
        self.conversation.clear()
        if self.realtime.is_connected():
            await self.realtime.disconnect()

    def get_turn_detection_type(self):
        return self.session_config.get("turn_detection", {}).get("type")

    async def add_tool(self, definition, handler):
        if not definition.get("name"):
            raise Exception("Missing tool name in definition")
        name = definition["name"]
        if name in self.tools:
            raise Exception(f'Tool "{name}" already added. Please use .removeTool("{name}") before trying to add again.')
        if not callable(handler):
            raise Exception(f'Tool "{name}" handler must be a function')
        self.tools[name] = {"definition": definition, "handler": handler}
        await self.update_session()
        return self.tools[name]

    def remove_tool(self, name):
        if name not in self.tools:
            raise Exception(f'Tool "{name}" does not exist, can not be removed.')
        del self.tools[name]
        return True

    async def delete_item(self, id):
        await self.realtime.send("conversation.item.delete", {"item_id": id})
        return True

    async def update_session(self, **kwargs):
        self.session_config.update(kwargs)
        use_tools = [
            {**tool_definition, "type": "function"}
            for tool_definition in self.session_config.get("tools", [])
        ] + [
            {**self.tools[key]["definition"], "type": "function"}
            for key in self.tools
        ]
        session = {**self.session_config, "tools": use_tools}
        if self.realtime.is_connected():
            await self.realtime.send("session.update", {"session": session})
        return True
    
    async def create_conversation_item(self, item):
        await self.realtime.send("conversation.item.create", {
            "item": item
        })

    async def send_user_message_content(self, content=[]):
        if content:
            for c in content:
                if c["type"] == "input_audio":
                    if isinstance(c["audio"], (bytes, bytearray)):
                        c["audio"] = array_buffer_to_base64(c["audio"])
            await self.realtime.send("conversation.item.create", {
                "item": {
                    "type": "message",
                    "role": "user",
                    "content": content,
                }
            })
        await self.create_response()
        return True

    async def append_input_audio(self, array_buffer):
        if len(array_buffer) > 0:
            await self.realtime.send("input_audio_buffer.append", {
                "audio": array_buffer_to_base64(np.array(array_buffer)),
            })
            self.input_audio_buffer.extend(array_buffer)
        return True

    async def create_response(self):
        if self.get_turn_detection_type() is None and len(self.input_audio_buffer) > 0:
            await self.realtime.send("input_audio_buffer.commit")
            self.conversation.queue_input_audio(self.input_audio_buffer)
            self.input_audio_buffer = bytearray()
        await self.realtime.send("response.create")
        return True

    async def cancel_response(self, id=None, sample_count=0):
        if not id:
            await self.realtime.send("response.cancel")
            return {"item": None}
        else:
            item = self.conversation.get_item(id)
            if not item:
                raise Exception(f'Could not find item "{id}"')
            if item["type"] != "message":
                raise Exception('Can only cancelResponse messages with type "message"')
            if item["role"] != "assistant":
                raise Exception('Can only cancelResponse messages with role "assistant"')
            await self.realtime.send("response.cancel")
            audio_index = next((i for i, c in enumerate(item["content"]) if c["type"] == "audio"), -1)
            if audio_index == -1:
                raise Exception("Could not find audio on item to cancel")
            await self.realtime.send("conversation.item.truncate", {
                "item_id": id,
                "content_index": audio_index,
                "audio_end_ms": int((sample_count / self.conversation.default_frequency) * 1000),
            })
            return {"item": item}

    async def wait_for_next_item(self):
        event = await self.wait_for_next("conversation.item.appended")
        return {"item": event["item"]}

    async def wait_for_next_completed_item(self):
        event = await self.wait_for_next("conversation.item.completed")
        return {"item": event["item"]}

添加工具和处理程序

您可以通过集成各种工具和处理程序来扩展语音机器人的功能。这些工具和处理程序允许机器人根据用户输入执行特定操作。

  1. 定义工具定义:
  • 在 tool.py 中,定义机器人的功能,例如检查订单状态、处理退货或更新帐户信息。
  • 每个工具都包含名称、描述和所需参数。
  • 实施处理程序:
  • 为每个工具创建异步处理程序函数来执行所需的操作。
  • 这些处理程序与您的后端系统或数据库交互以满足用户请求。
  • 将工具与实时客户端集成:
  • 在您的 app.py 文件中使用 RealtimeClient 注册每个工具及其处理程序。
  • 确保机器人可以在对话过程中动态调用这些工具。

关键组件:

  • 工具定义:
  • 每个工具的结构化描述,包括所需的参数和功能。

例子:

# Function Definitions
check_order_status_def = {
    "name": "check_order_status",
    "description": "Check the status of a customer's order",
    "parameters": {
      "type": "object",
      "properties": {
        "customer_id": {
          "type": "string",
          "description": "The unique identifier for the customer"
        },
        "order_id": {
          "type": "string",
          "description": "The unique identifier for the order"
        }
      },
      "required": ["customer_id", "order_id"]
    }
}

  • 处理程序功能:
  • 执行每个工具逻辑的异步函数。
  • 与外部系统、数据库交互,或根据用户请求执行特定操作

例子:

async def check_order_status_handler(customer_id, order_id):
    status = "In Transit"
    
    # Your Business Logic
    estimated_delivery, status, order_date =  fetch_order_details(order_id, customer_id)
    # Read the HTML template
    with open('order_status_template.html', 'r') as file:
        html_content = file.read()

    # Replace placeholders with actual data
    html_content = html_content.format(
        order_id=order_id,
        customer_id=customer_id,
        order_date=order_date.strftime("%B %d, %Y"),
        estimated_delivery=estimated_delivery.strftime("%B %d, %Y"),
        status=status
    )

    # Return the Chainlit message with HTML content
    await cl.Message(content=f"Here is the detail of your order \n {html_content}").send()
    return f"Order {order_id} status for customer {customer_id}: {status}"

参考:

  • 工具.py

与您的应用程序集成

有了实时客户端和工具,就可以将一切融入到您的应用程序中了。

  1. 初始化 OpenAI Realtime:
  • 在 app.py 中,使用系统提示和会话配置设置与 GPT-4o 实时 API 的连接。
  • 管理用户会话并无缝跟踪交互。
  • 处理用户交互:
  • 实现聊天发起、消息接收、音频处理和会话终止的事件处理程序。
  • 确保用户输入(无论是文本还是语音)都得到适当处理并实时响应。
  • 管理对话流:
  • 利用 RealtimeConversation 类来处理对话状态、管理音频流和维护上下文。
  • 实现逻辑来根据用户操作处理中断、取消和动态响应。

关键组件:

  • 初始化:
  • 使用系统提示设置 OpenAI Realtime Client 并配置工具。

system_prompt = """Provide helpful and empathetic support responses to customer inquiries for ShopMe in Hindi language, addressing their requests, concerns, or feedback professionally.

Maintain a friendly and service-oriented tone throughout the interaction to ensure a positive customer experience.

# Steps

1. **Identify the Issue:** Carefully read the customer's inquiry to understand the problem or question they are presenting.
2. **Gather Relevant Information:** Check for any additional data needed, such as order numbers or account details, while ensuring the privacy and security of the customer's information.
3. **Formulate a Response:** Develop a solution or informative response based on the understanding of the issue. The response should be clear, concise, and address all parts of the customer's concern.
4. **Offer Further Assistance:** Invite the customer to reach out again if they need more help or have additional questions.
5. **Close Politely:** End the conversation with a polite closing statement that reinforces the service commitment of ShopMe.

# Output Format

Provide a clear and concise paragraph addressing the customer's inquiry, including:
- Acknowledgment of their concern
- Suggested solution or response
- Offer for further assistance
- Polite closing

# Notes
- Greet user with Welcome to ShopMe For the first time only
- always speak in Hindi
- Ensure all customer data is handled according to relevant privacy and data protection laws and ShopMe's privacy policy.
- In cases of high sensitivity or complexity, escalate the issue to a human customer support agent.
- Keep responses within a reasonable length to ensure they are easy to read and understand."""

  • 事件处理程序:
  • 管理聊天开始、消息接收、音频流和会话终止事件。

首先我们将初始化之前讨论过的实时客户端

async def setup_openai_realtime(system_prompt: str):
    """Instantiate and configure the OpenAI Realtime Client"""
    openai_realtime = RealtimeClient(system_prompt = system_prompt)
    cl.user_session.set("track_id", str(uuid4()))
    async def handle_conversation_updated(event):
        item = event.get("item")
        delta = event.get("delta")
        """Currently used to stream audio back to the client."""
        if delta:
            # Only one of the following will be populated for any given event
            if 'audio' in delta:
                audio = delta['audio']  # Int16Array, audio added
                await cl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16", data=audio, track=cl.user_session.get("track_id")))
            if 'transcript' in delta:
                transcript = delta['transcript']  # string, transcript added
                pass
            if 'arguments' in delta:
                arguments = delta['arguments']  # string, function arguments added
                pass
            
    async def handle_item_completed(item):
        """Used to populate the chat context with transcription once an item is completed."""
        # print(item) # TODO
        pass
    
    async def handle_conversation_interrupt(event):
        """Used to cancel the client previous audio playback."""
        cl.user_session.set("track_id", str(uuid4()))
        await cl.context.emitter.send_audio_interrupt()
        
    async def handle_error(event):
        logger.error(event)

  • 会话管理:
  • 维护用户会话、处理对话中断并确保顺畅的交互流程。正如您在下面的代码中看到的那样,其理念是每当您收到音频块时,您都应该使用该音频块调用实时客户端。

if openai_realtime:            
    if openai_realtime.is_connected():
        await openai_realtime.append_input_audio(chunk.data)
    else:
        logger.info("RealtimeClient is not connected")

参考: app.py

测试和部署

一旦您的语音机器人构建完成,彻底的测试对于确保可靠性和用户满意度至关重要。

  1. 本地测试:
  • 使用 AI Studio 实时音频游乐场与您部署的模型进行交互。
  • 测试各种功能,包括语音识别、响应生成和工具执行。
  • 集成测试:
  • 确保您的应用程序与实时 API 无缝通信。
  • 测试事件处理程序和工具集成以验证不同场景下的正确行为。
  • 部署:
  • 将您的应用程序部署到生产环境,利用云服务实现可扩展性。
  • 监控性能并根据需要进行调整以处理实际使用情况。

结论

借助 GPT-4o Realtime API,构建实时语音机器人变得前所未有的简单。通过将语音转语音功能整合到一个高效的界面中,开发人员可以打造引人入胜且自然的对话体验,而无需管理多个模型。无论您是增强客户支持、开发教育工具还是创建交互式应用程序,GPT-4o Realtime API 都可以提供坚实的基础,让您的语音机器人愿景成为现实。


作为微软的合作伙伴,全云在线可为企业开通绿色申请通道!包括最新版本GPT-4o、o1、实时语音接口等,只要企业有需求,全云在线都能协助快速开通!


在数字化转型的浪潮中,AI技术已成为企业不可或缺的一部分。微软Azure OpenAI服务的持续可用性,为符合条件的企业用户提供了一个强大的工具,以保持竞争优势并推动业务增长。现在,是时候采取行动,利用这一服务,为企业的未来发展打下坚实的基础。


微软Azure OpenAI服务的这一变化,虽然对个人用户来说是一个挑战,但对于企业用户来说,却是一个巨大的机遇。通过继续使用这一服务,企业可以保持在AI技术应用的前沿,从而在激烈的市场竞争中占据有利地位。立即行动,与微软Azure OpenAI服务一起,开启您的企业AI之旅吧!

举报

相关推荐

0 条评论