Type something to search...
利用 CrewAI Flows 实现人工智能工作流程自动化

利用 CrewAI Flows 实现人工智能工作流程自动化

CrewAI

CrewAI 是一个尖端的 Python 框架,用于协调角色扮演和自主 AI 代理。通过构建“团队”由专业代理,我们可以自动化任务、生成创意文本格式,并以全新的方式访问信息。本文将指导您如何使用 CrewAI 的 Flow 功能创建简单项目。

流程

CrewAI 流程是一个强大的功能,旨在简化 AI 工作流程的创建和管理。流程允许开发人员与多个任务和团队协调,以创建工作流 AI 自动化。

流程允许连接多个任务,管理状态,并控制流的执行。开发人员可以设计单个工作流和多个使用 CrewAI 实现的工作流。

  1. 简化工作流创建:轻松创建多个团队和任务的复杂 AI 工作流。
  2. 状态管理:通过使用状态管理,您可以在流程中的不同任务之间管理状态。
  3. 事件驱动架构:基于事件驱动模型构建,允许动态和响应式工作流。
  4. 灵活的控制流:您可以使用条件逻辑、循环和分支来控制您的工作流。

流程执行

@start()

@start() 装饰器用于启动执行流程。当流程启动时,所有带有 start 装饰器的方法将并行执行。一个流程可以有多个 @start() 装饰器。

@start()
    async def validate_data(self):
        """Validate incoming data against predefined criteria."""
        data = self.state['data']
        # Perform validation based on specified conditions
        validation_result = 'success or falilure'
        
        # Execute validation task with result
        task_validate = Task(
            description=f'Validate data: {data}. Criteria: score > 85 and age > 30.',
            agent=dataValidationAgent,
            expected_output=validation_result  # Expected output to indicate success or failure
        )
        crew = Crew(
            agents=[dataValidationAgent],
            tasks=[task_validate],
            verbose=True,
            process=Process.sequential
        )
        result = await crew.kickoff_async()
        self.state['validation_success'] = result.raw  == 'success'

@listen()

@listen() 装饰器用于标记一个方法为 Flow 中另一个任务输出的监听器。当指定的任务发出输出时,使用 @listen() 装饰的方法将被执行。Flow 可以有多个监听装饰器,一个方法可以使用条件逻辑监听多个输出,我们将在下面讨论。

@listen((validate_data))
    async def send_notification_on_failure(self):
        """Send a notification if data validation fails."""
        if not self.state['validation_success']:
            task_notify = Task(
                description=f'Issue a notification for validation failure for data: {self.state["data"]}.',
                agent=notificationAgent,
                expected_output='Notification sent for validation failure'  # Indicating notification was sent
            )
            crew = Crew(
                agents=[notificationAgent],
                tasks=[task_notify],
                verbose=True,
                process=Process.sequential
            )
            await crew.kickoff_async()
            print("Notification sent: Validation failed.")
        else:
            print("Validation succeeded, no notification for failure needed.")

流程控制

条件逻辑:或,和,路由

Flows 中的 or_ 函数允许您监听多个方法,并在任何指定的方法发出输出时触发监听器方法。

Flows 中的 and_ 函数允许您监听多个方法,并仅在所有指定的方法发出输出时触发监听器方法。

Flows 中的 @router() 装饰器允许您根据方法的输出定义条件路由逻辑。您可以根据每个方法的输出定义多个路由,从而动态控制执行流程。

@listen(and_(analyze_data,validate_data))#listen mutiple method using and_ decorator
    async def backup_data(self):
        """Backup data after successful analysis."""
        if self.state['analysis_success']:
            task_backup = Task(
                description=f'Create a backup for analyzed data: {self.state["data"]}.',
                agent=dataBackupAgent,
                expected_output='Data backup completed successfully'  # Expected output indicating backup completion
            )
            crew = Crew(
                agents=[dataBackupAgent],
                tasks=[task_backup],
                verbose=True,
                process=Process.sequential
            )
            backup_status = await crew.kickoff_async()
            self.state['backup_done'] = backup_status == 'Data backup completed successfully'
            print("Data backup completed.")
        else:
            print("Backup skipped as data analysis was not successful.")

流状态管理

有效地管理状态对于构建可靠和可维护的 AI 工作流至关重要。CrewAI Flows 提供了强大的机制,用于非结构化和结构化状态管理,使开发人员能够选择最符合其应用需求的方法。

非结构化状态管理

在非结构化状态管理中,所有状态都存储在 Flow 类的 state 属性中。这种方法提供了灵活性,使开发人员能够动态添加或修改状态属性,而无需定义严格的模式。

class CustomDataFlow(Flow):
    def __init__(self, data):
        super().__init__()
        self.state['data'] = data  #state attribute store Unstructure state
        self.state['validation_success'] = False
        self.state['analysis_success'] = False
        self.state['backup_done'] = False

结构化状态管理

结构化状态管理利用预定义的模式确保工作流程中的一致性和类型安全。通过使用像 Pydantic 的 BaseModel 这样的模型,开发人员可以定义状态的确切形状,从而在开发环境中实现更好的验证和自动补全。

完整代码 crewAi 流程

import asyncio
from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, start, listen, and_, or_
## 代理定义角色和背景故事
dataValidationAgent = Agent(
    role='数据验证器',
    goal='验证传入数据以确保其符合预定义标准,并仅输出成功或失败,没有额外的文字,如果输出为失败,则给出原因。',
    backstory="你是一个代理,负责验证给定数据的质量。",
    verbose=True
)
dataBackupAgent = Agent(
    role='数据备份代理',
    goal='安全地创建数据备份以便于恢复和安全。',
    backstory="你是一个代理,负责收集需要备份的必要信息并进行安全存储。",
    verbose=True
)
dataAnalysisAgent = Agent(
    role='数据分析师',
    goal='分析数据并生成有意义的商业洞察。',
    backstory="你是一个擅长分析数据的代理,能够从数据中提取必要信息。",
    verbose=True
)
notificationAgent = Agent(
    role='通知者',
    goal='根据各种方法的结果生成通知,仅包含必要的信息。',
    backstory="你是一个负责发送警报和更新的代理。",
    verbose=True
)
## 定义主工作流程
class CustomDataFlow(Flow):
    def __init__(self, data):
        super().__init__()
        self.state['data'] = data
        self.state['validation_success'] = False
        self.state['analysis_success'] = False
        self.state['backup_done'] = False
    @start()
    async def validate_data(self):
        """根据预定义标准验证传入数据。"""
        data = self.state['data']
        # 根据指定条件执行验证
        validation_result = '成功或失败'
        
        # 执行带有结果的验证任务
        task_validate = Task(
            description=f'验证数据: {data}. 标准: score > 85 和 age > 30。',
            agent=dataValidationAgent,
            expected_output=validation_result  # 预期输出以指示成功或失败
        )
        crew = Crew(
            agents=[dataValidationAgent],
            tasks=[task_validate],
            verbose=True,
            process=Process.sequential
        )
        result = await crew.kickoff_async()
        self.state['validation_success'] = result.raw  == '成功'
    @listen((validate_data))
    async def send_notification_on_failure(self):
        """如果数据验证失败,则发送通知。"""
        if not self.state['validation_success']:
            task_notify = Task(
                description=f'发出验证失败的通知,数据: {self.state["data"]}。',
                agent=notificationAgent,
                expected_output='已发送验证失败通知'  # 表示通知已发送
            )
            crew = Crew(
                agents=[notificationAgent],
                tasks=[task_notify],
                verbose=True,
                process=Process.sequential
            )
            await crew.kickoff_async()
            print("通知已发送: 验证失败。")
        else:
            print("验证成功,无需发送失败通知。")
    @listen((validate_data))
    async def analyze_data(self):
        """仅在验证成功时分析数据。"""
        if self.state['validation_success']:
            task_analyze = Task(
                description=f'分析验证过的数据: {self.state["data"]} 以提取有意义的洞察。',
                agent=dataAnalysisAgent,
                expected_output='数据分析成功完成'  # 预期输出以指示分析成功
            )
            crew = Crew(
                agents=[dataAnalysisAgent],
                tasks=[task_analyze],
                verbose=True,
                process=Process.sequential
            )
            analysis_result = await crew.kickoff_async()
            self.state['analysis_success'] = analysis_result == '数据分析成功完成'
        else:
            print("由于数据验证未成功,跳过分析。")
    @listen(and_(analyze_data,validate_data))
    async def backup_data(self):
        """在成功分析后备份数据。"""
        if self.state['analysis_success']:
            task_backup = Task(
                description=f'为分析过的数据创建备份: {self.state["data"]}。',
                agent=dataBackupAgent,
                expected_output='数据备份成功完成'  # 预期输出以指示备份完成
            )
            crew = Crew(
                agents=[dataBackupAgent],
                tasks=[task_backup],
                verbose=True,
                process=Process.sequential
            )
            backup_status = await crew.kickoff_async()
            self.state['backup_done'] = backup_status == '数据备份成功完成'
            print("数据备份完成。")
        else:
            print("由于数据分析未成功,跳过备份。")
    @listen(or_(send_notification_on_failure, backup_data))
    async def send_final_notification(self):
        """发送最终通知,汇总数据处理结果。"""
        if not self.state['validation_success']:
            message = "数据验证失败,无需备份。"
        elif self.state['backup_done']:
            message = "数据成功分析并备份。已发出最终通知。"
        else:
            message = "数据处理完成,但有警告。"
        task_notify = Task(
            description=f'发出最终通知: {message},数据: {self.state["data"]}。',
            agent=notificationAgent,
            expected_output=f'最终通知已发送: {message}'  # 表示最终通知结果
        )
        crew = Crew(
            agents=[notificationAgent],
            tasks=[task_notify],
            verbose=True,
            process=Process.sequential
        )
        await crew.kickoff_async()
        print("最终通知已发送。")
## 主入口点以执行流程
async def main():
    data_flow = CustomDataFlow(data={
        "id": 1,
        "name": "Kuldeep",
        "age": 32,
        "country": "印度",
        "score": 85.5,
        "status": "active"
    })
    data_flow.plot()  # 可视化流程
    await data_flow.kickoff()
if __name__ == "__main__":
    asyncio.run(main())

流程

结论:

Crew AI 转变了多智能体系统的开发和管理,提供了一个强大的框架来构建先进的 AI 解决方案。通过利用智能体、任务和工具,Flows 用户可以创建高效且协作的 AI 系统,以满足特定需求。通过使用 Flow,您可以管理每个智能体的状态,这对于处理复杂任务非常有用。

参考

https://docs.crewai.com/concepts/flowshttps://stackoverflow.com/questions/tagged/crewai

关于作者:

Kuldeep Yadav 开始了他的旅程,成为了 CodeStax.Ai 的软件工程师。他喜欢探索多个领域,并热衷于以高效的方式解决问题。

关于 CodeStax.Ai

CodeStax.AI ,我们处于创新与企业解决方案的交汇点,提供技术合作伙伴关系,使企业能够推动效率、创新和增长,利用无代码平台和先进的 AI 集成的变革力量。

但真正的魔力是什么?是我们在幕后支持的技术团队。如果你对创新有敏锐的洞察力,并热衷于重新定义常规,我们为你提供了完美的技术游乐场。CodeStax.Ai 提供的不仅仅是一份工作——这是一段深入未来核心的旅程。加入我们,成为重新定义企业技术格局的革命的一部分。

Related Posts

使用 ChatGPT 搜索网络功能的 10 种创意方法

使用 ChatGPT 搜索网络功能的 10 种创意方法

例如,提示和输出 你知道可以使用 ChatGPT 的“搜索网络”功能来完成许多任务,而不仅仅是基本的网络搜索吗? 对于那些不知道的人,ChatGPT 新的“搜索网络”功能提供实时信息。 截至撰写此帖时,该功能仅对使用 ChatGPT 4o 和 4o-mini 的付费会员开放。 ![](https://images.weserv.nl/?url=https://cdn-im

阅读更多
在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和科技这样一个动态的行业中,保持领先意味着不断提升你的技能。无论你是希望深入了解人工智能模型性能、掌握数据分析,还是希望通过人工智能转变传统领域如法律,这些课程都是你成功的捷径。以下是一个精心策划的高价值课程列表,可以助力你的职业发展,并让你始终处于创新的前沿。 1. 生成性人工智能简介课程: [生成性人工智能简介](https://genai.works

阅读更多
10 个强大的 Perplexity AI 提示,让您的营销任务自动化

10 个强大的 Perplexity AI 提示,让您的营销任务自动化

在当今快速变化的数字世界中,营销人员总是在寻找更智能的方法来简化他们的工作。想象一下,有一个个人助理可以为您创建受众档案,建议营销策略,甚至为您撰写广告文案。这听起来像是一个梦想? 多亏了像 Perplexity 这样的 AI 工具,这个梦想现在成为现实。通过正确的提示,您可以将 AI 转变为您的 个人营销助理。在本文中,我将分享 10 个强大的提示,帮助您自动

阅读更多
10+ 面向 UI/UX 设计师的顶级 ChatGPT 提示

10+ 面向 UI/UX 设计师的顶级 ChatGPT 提示

人工智能技术,如机器学习、自然语言处理和数据分析,正在重新定义传统设计方法。从自动化重复任务到实现个性化用户体验,人工智能使设计师能够更加专注于战略思维和创造力。随着这一趋势的不断增长,UI/UX 设计师越来越多地采用 AI 驱动的工具来促进他们的工作。利用人工智能不仅能提供基于数据的洞察,还为满足多样化用户需求的创新设计解决方案开辟了机会。 1. 用户角色开发 目的

阅读更多
在几分钟内完成数月工作的 100 种人工智能工具

在几分钟内完成数月工作的 100 种人工智能工具

人工智能(AI)的快速发展改变了企业的运作方式,使人们能够在短短几分钟内完成曾经需要几周或几个月的任务。从内容创作到网站设计,AI工具帮助专业人士节省时间,提高生产力,专注于创造力。以下是按功能分类的100个AI工具的全面列表,以及它们在现实世界中的使用实例。 1. 研究工具 研究可能耗时,但人工智能工具使查找、分析和组织数据变得更加容易。**ChatGPT, Cop

阅读更多
你从未知道的 17 个令人惊叹的 GitHub 仓库

你从未知道的 17 个令人惊叹的 GitHub 仓库

Github 隐藏的宝石!! 立即收藏的代码库 学习编程相对简单,但掌握编写更好代码的艺术要困难得多。GitHub 是开发者的宝藏,那里“金子”是其他人分享的精心编写的代码。通过探索 GitHub,您可以发现如何编写更清晰的代码,理解高质量代码的样子,并学习成为更熟练开发者的基本步骤。 1. notwaldorf/emoji-translate *谁需

阅读更多