
构建实时gemini 2.0移动应用:5个步骤实现语音聊天和图像共享功能
- Rifx.Online
- Mobile Development , Generative AI , AI Applications
- 26 Feb, 2025
作者插图
在本教程中,我将分享我的最新项目,继续探索与 Gemini 2.0 及其 multi-modal live API 相关的实时应用。在通过多个网络应用项目深入了解 Gemini 2.0 的迷人实时能力后,包括摄像头聊天、屏幕共享互动画布和 RAG 助手,我花了一个周末构建了一个 Android 应用,以复制甚至增强与 Gemini 2.0 的多模态实时体验。以下是演示视频:
现在,让我们深入技术细节。和往常一样,我们将首先快速回顾 Gemini 2.0 提供的核心功能,并了解为什么 multi-modal live API 如此强大。Gemini 的主要优势在于其同时处理多种类型输入的能力,包括文本、音频和视频。这种多模态能力结合了 live API 的流媒体功能,使得用户可以在对话中自然、互动地进行交流,用户可以无缝切换说话、展示图像和接收响应,就像与真实的人交谈一样。live API 还能够优雅地处理中断,并在整个对话中保持上下文,这对于构建响应式移动应用程序是理想的。
然而,我们必须记住,这个 API 并不是 Google 官方的 Generative AI 套件的一部分,仍然被视为实验性。目前,它是免费的,API 密钥的速率限制为每个 API 密钥三条并发会话、每分钟 400 万 tokens,以及音频会话持续时间限制为 15 分钟,音频加视频会话限制为 2 分钟。没有可用的付费计划来增加这些限制。
系统架构
不再赘述,让我们构建这个应用程序。首先,我们需要了解这个应用程序的基本结构和数据流。请注意,这是初始版本,因此其功能并不复杂。我可能会在未来的教程中添加更高级的功能。
该过程始于用户在 Android 应用上,用户通过摄像头捕捉图像或通过麦克风录制声音。Android 应用随后处理这些媒体数据。图像被压缩并编码为 Base64
,而音频则被捕获和处理。这些处理过的媒体数据,包括图像和音频,被打包成 JSON 格式,并通过 WebSocket 连接传输到我们的 Python 服务器。在服务器端,消息被接收,音频和图像数据被转发到 Gemini 2.0 的 multi-modal live API。模型处理媒体数据并生成响应,响应可以是音频或文本。然后,后端通过 WebSocket 将此响应数据流式传输回 Android 客户端,在那里音频响应被排队并播放。
代码讲解
现在,让我们开始编码。我将向您展示整个服务器代码,使用 Python,因为它很简短。对于 Android 应用,不用担心,我不会逐行讲解。相反,我会专注于使应用正常工作的关键组件。我尽量保持结构尽可能清晰,这样即使您不是 Android 专家,仍然可以很好地理解它的运作。如果您需要整个代码库,可以访问我的 GitHub 仓库找到它。
后端 Python 服务器
后端实现与我之前的网页应用项目非常相似,因为它作为前端与 multi-modal live API 之间的桥梁,无论是移动端还是网页都没有改变。因此,如果您已经阅读过我之前的教程,例如:
您可以跳过这一部分,直接进入后面关于 Android 应用的代码讲解。
初始化
对于用 Python 编写的服务器,请确保您已安装 google-genai
包。我使用的是最新版本 0.5.0,因此请确保您有相同的版本。同时,安装 websockets
包以便于服务器与客户端之间的通信。
我们使用 websockets
库创建一个服务器,监听来自我们的 Android 应用的传入连接。
import asyncio
import json
import os
import websockets
from google import genai
import base64
async def main() -> None:
async with websockets.serve(gemini_session_handler, "0.0.0.0", 9084):
print("Running websocket server 0.0.0.0:9084...")
await asyncio.Future() # Keep the server running indefinitely
在这里,服务器设置在特定端口,例如 9084。多模态处理逻辑在 gemini_session_handler()
中处理,这是每个新的 WebSocket 连接的核心。确保服务器监听 0.0.0.0
IP 地址而不是 localhost
是很重要的,因为这允许服务器暴露于外部网络。这是必要的,以便移动客户端可以从任何外部 IP 地址连接到它,因为移动客户端可能在不同的网络上运行。google-genai
库用于创建一个能够与 Gemini Multimodal API 通信的客户端,我们在其中指定我们的 API 密钥并配置模型 gemini-2.0-flash-exp
,我们将用于处理多模态数据。gemini_session_handler()
被 websockets.serve
调用,管理 Gemini API 会话的整个生命周期。
os.environ['GOOGLE_API_KEY'] = ''
MODEL = "gemini-2.0-flash-exp"
client = genai.Client(
http_options={
'api_version': 'v1alpha',
}
)
gemini_session_handler()
这里是整个 gemini_session_handler()
函数定义:
async def gemini_session_handler(client_websocket: websockets.WebSocketServerProtocol):
"""Handles the interaction with Gemini API within a websocket session."""
try:
config_message = await client_websocket.recv()
config_data = json.loads(config_message)
config = config_data.get("setup", {})
config["system_instruction"] = "You are a daily life assistant."
async with client.aio.live.connect(model=MODEL, config=config) as session:
print("Connected to Gemini API")
async def send_to_gemini():
"""Sends messages from the client websocket to the Gemini API."""
try:
async for message in client_websocket:
try:
data = json.loads(message)
if "realtime_input" in data:
for chunk in data["realtime_input"]["media_chunks"]:
if chunk["mime_type"] == "audio/pcm":
await session.send(input={"mime_type": "audio/pcm", "data": chunk["data"]})
elif chunk["mime_type"] == "image/jpeg":
print(f"Sending image chunk: {chunk['data'][:50]}")
await session.send(input={"mime_type": "image/jpeg", "data": chunk["data"]})
except Exception as e:
print(f"Error sending to Gemini: {e}")
print("Client connection closed (send)")
except Exception as e:
print(f"Error sending to Gemini: {e}")
finally:
print("send_to_gemini closed")
async def receive_from_gemini():
"""Receives responses from the Gemini API and forwards them to the client, looping until turn is complete."""
try:
while True:
try:
print("receiving from gemini")
async for response in session.receive():
if response.server_content is None:
print(f'Unhandled server message! - {response}')
continue
model_turn = response.server_content.model_turn
if model_turn:
for part in model_turn.parts:
if hasattr(part, 'text') and part.text is not None:
await client_websocket.send(json.dumps({"text": part.text}))
elif hasattr(part, 'inline_data') and part.inline_data is not None:
print("audio mime_type:", part.inline_data.mime_type)
base64_audio = base64.b64encode(part.inline_data.data).decode('utf-8')
await client_websocket.send(json.dumps({"audio": base64_audio}))
print("audio received")
if response.server_content.turn_complete:
print('\\n<Turn complete\\>')
except websockets.exceptions.ConnectionClosedOK:
print("Client connection closed normally (receive)")
break
except Exception as e:
print(f"Error receiving from Gemini: {e}")
break
except Exception as e:
print(f"Error receiving from Gemini: {e}")
finally:
print("Gemini connection closed (receive)")
send_task = asyncio.create_task(send_to_gemini())
receive_task = asyncio.create_task(receive_from_gemini())
await asyncio.gather(send_task, receive_task)
except Exception as e:
print(f"Error in Gemini session: {e}")
finally:
print("Gemini session closed.")
在函数内部,当 WebSocket 连接建立时,我们首先从客户端获取配置数据。我们还向 config
结构添加系统指令,以指导模型的行为。在这种情况下,我指示模型成为一个“日常生活助手”。然后,我们使用 client.aio.live.connect()
连接到 Gemini API,提供之前定义的模型和配置。这建立了与 Gemini API 的连接作为一个 session
,允许服务器与 Gemini API 发送和接收数据。gemini_session_handler()
内的 send_to_gemini()
函数管理从 Android 客户端到 Gemini API 的消息流。它获取格式为 JSON 的媒体 chunks
,包含音频和图像数据,并通过 WebSocket 连接使用 session.send()
方法将其发送到 Gemini API。
接下来是 receive_from_gemini()
函数,该函数负责从 Gemini API 接收数据并将其转发回客户端。它使用一个 while 循环不断监听来自 Gemini 的新消息。收到消息后,它检查响应是文本还是音频。如果是音频响应,它提取 Base64
数据并通过 WebSocket 连接使用 client_websocket.send()
方法将其发送回客户端。它还监视 turn_complete
标志,以确保接收到完整音频轮次的所有块。这两个函数打包成两个任务并使用 asyncio.gather()
方法并发运行。
这完成了后端服务器。
Android 应用
现在,让我们切换到 Android 端。首先,您应该下载、安装并设置 Android Studio 作为 IDE,并打开从我的 GitHub 下载的项目。
简要地说,让我们检查一下布局,它保存在 activity_main.xml
中,该文件位于 app/src/main/res/layout
的主文件夹下。
Android 应用的布局
如您所见,这是一种非常简单的设计,由一个线性布局组成,其中包括一个用于图像捕获预览的 ImageView
、一个状态指示器、三个用于相机捕获、录音开始和录音停止的 Material
按钮,以及一个可选的 TextView
用于显示聊天记录,但在此项目中尚未完全实现。
现在,让我们查看 MainActivity.kt
文件,这是我们应用的主要入口点,使用 Kotlin 编写。
关键变量
在 MainActivity.kt
中,我们从布局 XML 开始,定义了一些关键变量,包括对 UI 元素的引用:imageView
、captureButton
、startButton
、stopButton
、chatLog
和 statusIndicator
。
class MainActivity : AppCompatActivity() {
private lateinit var imageView: ImageView
private lateinit var captureButton: Button
private lateinit var startButton: Button
private lateinit var stopButton: Button
private lateinit var chatLog: TextView
private lateinit var statusIndicator: ImageView
我们还有一些重要的变量用于管理连接和数据,包括:
websocket
作为我们与 Python 后端进行实时通信的核心,使用一个 URL,包括连接到 WebSocket 服务器的真实 IP 地址。确保你正确输入实际运行 Python 后端的 IP 地址。
private var webSocket: WebSocketClient? = null
private val URL = "ws://your_server_IP:9084"
isRecording
标志表示音频录制状态,audioRecord
处理麦克风音频,pcmData
存储临时音频,audioQueue
异步管理传入的音频块,audioTrack
处理解码音频响应的播放。currentFrameB64
存储 Base64 编码的图像数据。我们使用特定的压缩算法以确保与 Gemini 多模态 API 的兼容性,同时保持良好的用户体验。我们还有 recordInterval
,一个用于发送音频块的协程作业,以及 isPlaying
,以确保顺序播放。
private var currentFrameB64: String? = null
private var isRecording = false
private var audioRecord: AudioRecord? = null
private var pcmData = mutableListOf<Short>()
private var job: Job? = null
private var recordInterval: Job? = null
private val audioQueue = mutableListOf<ByteArray>()
private var isPlaying = false
private var audioTrack: AudioTrack? = null;
功能
现在,让我们来看一下关键功能。onCreate()
函数初始化 UI 组件并设置点击监听器。
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
imageView = findViewById(R.id.imageView)
captureButton = findViewById(R.id.captureButton)
startButton = findViewById(R.id.startButton)
stopButton = findViewById(R.id.stopButton)
chatLog = findViewById(R.id.chatLog)
statusIndicator = findViewById(R.id.statusIndicator)
}
checkCameraPermission()
和 checkRecordAudioPermission()
处理实时权限请求。
private fun checkCameraPermission() {
if (ContextCompat.checkSelfPermission(
this,
Manifest.permission.CAMERA
) != PackageManager.PERMISSION_GRANTED
) {
// 处理权限未授予
}
}
private fun checkRecordAudioPermission() {
if (ContextCompat.checkSelfPermission(
this,
Manifest.permission.RECORD_AUDIO
) != PackageManager.PERMISSION_GRANTED
) {
// 处理权限未授予
}
}
openCamera()
和 createImageFile()
捕获图像并将其存储在临时文件中。
private fun openCamera() {
val takePictureIntent = Intent(MediaStore.ACTION_IMAGE_CAPTURE)
val photoFile: File? = try {
createImageFile()
} catch (ex: IOException) {
// 创建文件时发生错误
null
}
// 继续进行相机意图
}
private fun createImageFile(): File {
val timeStamp: String = SimpleDateFormat("yyyyMMdd_HHmmss", Locale.getDefault()).format(Date())
val storageDir: File = getExternalFilesDir(Environment.DIRECTORY_PICTURES)!!
return File.createTempFile(
"JPEG_${timeStamp}_",
".jpg",
storageDir
).apply {
currentPhotoPath = absolutePath
}
}
onActivityResult()
处理捕获的图像,调整大小、压缩并编码为 Base64 字符串,将生成的位图设置为预览。
override fun onActivityResult(requestCode: Int, resultCode: Int, data: Intent?) {
super.onActivityResult(requestCode, resultCode, data)
if (requestCode == CAMERA_REQUEST_CODE && resultCode == RESULT_OK) {
val file = File(currentPhotoPath)
val options = BitmapFactory.Options().apply {
inJustDecodeBounds = true
}
BitmapFactory.decodeFile(file.absolutePath, options)
val (originalWidth, originalHeight) = options.outWidth to options.outHeight
val scaleFactor = calculateScaleFactor(originalWidth, originalHeight, MAX_IMAGE_DIMENSION)
// 继续处理图像
}
}
connect()
函数建立与 WebSocket 的连接,并定义事件。
private fun connect() {
Log.d("WebSocket", "Connecting to: $URL")
webSocket = object : WebSocketClient(URI(URL)) {
override fun onOpen(handshakedata: ServerHandshake?) {
Log.d("WebSocket", "Connected")
isConnected = true
updateStatusIndicator()
sendInitialSetupMessage()
}
override fun onMessage(message: String?) {
Log.d("WebSocket", "Message Received: $message")
receiveMessage(message)
}
override fun onClose(code: Int, reason: String?, remote: Boolean) {
Log.d("WebSocket", "Connection Closed: $reason")
isConnected = false
updateStatusIndicator()
runOnUiThread {
Toast.makeText(this@MainActivity, "Connection closed", Toast.LENGTH_SHORT).show()
}
}
override fun onError(ex: Exception?) {
Log.e("WebSocket", "Error: ${ex?.message}")
isConnected = false
updateStatusIndicator()
}
}
webSocket?.connect()
}
sendInitialSetupMessage()
将配置发送到后端。sendVoiceMessage()
将音频和图像数据打包为 JSON 通过 WebSocket 发送给 Gemini。
private fun sendInitialSetupMessage() {
Log.d("WebSocket", "Sending initial setup message")
val setupMessage = JSONObject()
val setup = JSONObject()
val generationConfig = JSONObject()
val responseModalities = org.json.JSONArray()
responseModalities.put("AUDIO")
generationConfig.put("response_modalities", responseModalities)
setup.put("generation_config", generationConfig)
setupMessage.put("setup", setup)
webSocket?.send(setupMessage.toString())
}
private fun sendVoiceMessage(b64PCM: String?) {
if(webSocket?.isOpen == false){
Log.d("WebSocket", "websocket not open")
return
}
if (b64PCM == null) return
val payload = JSONObject()
val realtimeInput = JSONObject()
val mediaChunks = org.json.JSONArray()
val audioChunk = JSONObject()
// 继续打包音频数据
}
receiveMessage()
处理响应,处理文本以显示在 chatLog 中,并将音频响应传递给 injestAudioChuckToPlay()
,该函数将数据存储在队列中并调用 playNextAudioTrunk()
。playNextAudioTrunk()
函数顺序检索音频块并调用 playAudio。playAudio()
使用 Android AudioTrack()
库播放音频数据。
private fun receiveMessage(message: String?) {
if (message == null) return
val messageData = JSONObject(message)
val response = Response(messageData)
if (response.text != null) {
displayMessage("GEMINI: " + response.text)
}
if (response.audioData != null) {
injestAudioChuckToPlay(response.audioData)
}
}
private fun injestAudioChuckToPlay(base64AudioChunk: String?) {
if (base64AudioChunk == null) return
GlobalScope.launch(Dispatchers.IO) {
try {
val arrayBuffer = base64ToArrayBuffer(base64AudioChunk)
synchronized(audioQueue) {
audioQueue.add(arrayBuffer)
}
if (!isPlaying) {
playNextAudioChunk()
}
Log.d("Audio", "Audio chunk added to the queue")
} catch (e: Exception) {
Log.e("Audio", "Error processing chunk", e)
}
}
}
private fun playNextAudioChunk() {
GlobalScope.launch(Dispatchers.IO) {
while (true) {
val chunk = synchronized(audioQueue) {
if (audioQueue.isNotEmpty()) audioQueue.removeAt(0) else null
} ?: break
isPlaying = true
playAudio(chunk)
}
isPlaying = false
synchronized(audioQueue) {
if (audioQueue.isNotEmpty()) {
playNextAudioChunk()
}
}
}
}
startAudioInput()
启动音频录制,创建 AudioRecord()
,并触发协程任务以频繁录制音频。recordTrunk()
函数将 PCM 音频转换为 Base64,发送到后端,并清理临时数据。stopAudioInput()
停止音频录制,关闭 AudioRecord()
会话,并向服务器发送关闭消息。您可以在源代码库中找到这些函数的详细信息。
最后,updateStatusIndicator()
使用不同的颜色在状态图标上显示连接状态。
private fun updateStatusIndicator() {
runOnUiThread {
if (!isConnected) {
statusIndicator.setImageResource(R.drawable.baseline_error_24)
statusIndicator.setColorFilter(android.graphics.Color.RED)
} else if (!isSpeaking) {
statusIndicator.setImageResource(R.drawable.baseline_equalizer_24)
statusIndicator.setColorFilter(android.graphics.Color.GRAY)
} else {
statusIndicator.setImageResource(R.drawable.baseline_equalizer_24)
statusIndicator.setColorFilter(android.graphics.Color.GREEN)
}
}
}
亮点
这个 Android 应用的一个创新之处在于我们如何使用协程任务管理实时音频流,持续捕获音频,频繁发送每个音频块,并通过队列仔细管理音频播放。
此外,图像压缩是另一个关键改进。通过调整捕获图像的大小和压缩数据,我们在不牺牲用户体验的情况下大幅减少了图像数据的大小,这保持了网络开销低并确保了流畅的实时通信。
运行应用程序
应用程序现在准备运行。我们应该启动后端服务器和前端 Android 应用。
要启动后端服务器,只需执行 Python 脚本。您会看到 WebSocket 服务器正在“0.0.0.0:9084”上运行。这表明我们的后端服务器已成功在外部网络上运行。
接下来,在连接到 Android Studio 的测试设备上运行 Android 应用。您会在设备管理器中看到您的物理设备,然后点击运行按钮。
您现在应该在测试手机上看到干净的应用。如果出现红色警告标志,则表示连接问题。灰色指示器表示应用程序准备好进行交互。您可以点击“麦克风”按钮将指示器变为绿色并开始与 Gemini 聊天,或者点击“摄像头”按钮拍照与其聊天。
连接错误