AI Workflow Automation with CrewAI Flows
- Rifx.Online
- Programming , Machine Learning , Data Science
- 07 Dec, 2024
CrewAI
CrewAI is cutting-edge Python framework for orchestrating role-playing, autonomous AI agents.By building “crews” of specialized agents, we can automate tasks, generate creative text formats, and access information in a whole new way.This article guide you how to create simple project using crewAI usingFlow feature .
Flows
CrewAI Flows is a powerful feature designed to streamline the creation and management of AI workflows. Flows allows to developer co-ordinate with mutiple task and Crews to create Workflow AI automation.
Flows allow to connect multiple task , manage state ,and control the flow execution. developer can design single workflow and mutiple which implement using crewAI.
- Simplified Workflow Creation: Easily create multiple Crews and tasks complex AI workflows.
- State Management: By using state mangement you can manage state between different task within your flow.
- Event-Driven Architecture: By using Built on an event-driven model, allowing for dynamic and responsive workflows.
- Flexible Control Flow: you can control your workflow with condition logic , loops and branching.
Flow Execution
@start()
The @start()
decorator is used to start the flow of execution.when a flowis started all the method with start decorator executing in parrallel. flow can have multiple @start()
decorator .
@start()
async def validate_data(self):
"""Validate incoming data against predefined criteria."""
data = self.state['data']
# Perform validation based on specified conditions
validation_result = 'success or falilure'
# Execute validation task with result
task_validate = Task(
description=f'Validate data: {data}. Criteria: score > 85 and age > 30.',
agent=dataValidationAgent,
expected_output=validation_result # Expected output to indicate success or failure
)
crew = Crew(
agents=[dataValidationAgent],
tasks=[task_validate],
verbose=True,
process=Process.sequential
)
result = await crew.kickoff_async()
self.state['validation_success'] = result.raw == 'success'
@listen()
The @listen()
decorator is used to mark a method as a listener for the output of another task in the Flow. The method decorated with @listen()
will be executed when the specified task emits an output. Flow can have multiple listen decorator and one method can listen mutiple mutiple usingconditional logic which we will decuss below.
@listen((validate_data))
async def send_notification_on_failure(self):
"""Send a notification if data validation fails."""
if not self.state['validation_success']:
task_notify = Task(
description=f'Issue a notification for validation failure for data: {self.state["data"]}.',
agent=notificationAgent,
expected_output='Notification sent for validation failure' # Indicating notification was sent
)
crew = Crew(
agents=[notificationAgent],
tasks=[task_notify],
verbose=True,
process=Process.sequential
)
await crew.kickoff_async()
print("Notification sent: Validation failed.")
else:
print("Validation succeeded, no notification for failure needed.")
Flow Control
Conditional Logic: or , and, router
The or_
function in Flows allow you to listen to multiple methods and trigger the listener method when any of the specified methods emit an output.
The and_
function in Flows allow you to listen to multiple methods and trigger the listener method only when all the specified methods emit an output.
The @router()
decorator in Flows allows you to define conditional routing logic based on the output of a method. You can define mutiple routes based on the output of the each method that allow us to control the flow of execution dynamically.
@listen(and_(analyze_data,validate_data))#listen mutiple method using and_ decorator
async def backup_data(self):
"""Backup data after successful analysis."""
if self.state['analysis_success']:
task_backup = Task(
description=f'Create a backup for analyzed data: {self.state["data"]}.',
agent=dataBackupAgent,
expected_output='Data backup completed successfully' # Expected output indicating backup completion
)
crew = Crew(
agents=[dataBackupAgent],
tasks=[task_backup],
verbose=True,
process=Process.sequential
)
backup_status = await crew.kickoff_async()
self.state['backup_done'] = backup_status == 'Data backup completed successfully'
print("Data backup completed.")
else:
print("Backup skipped as data analysis was not successful.")
Flow State Management
Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management, allowing developers to choose the approach that best fits their application’s needs.
Unstructured State Management
In unstructured state management, all state is stored in the state
attribute of the Flow
class. This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
class CustomDataFlow(Flow):
def __init__(self, data):
super().__init__()
self.state['data'] = data #state attribute store Unstructure state
self.state['validation_success'] = False
self.state['analysis_success'] = False
self.state['backup_done'] = False
Structured State Management
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow. By using models like Pydantic’s BaseModel
, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
Full Code crewAi flow
import asyncio
from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, start, listen, and_, or_
## Agent where define role and backstory
dataValidationAgent = Agent(
role='Data Validator',
goal='Validate incoming data to ensure it meets predefined criteria. and give output only success or failure nothing extra word if output is failure then give reason ',
backstory="you are an agent to verify given data with qualaty ",
verbose=True
)
dataBackupAgent = Agent(
role='Data Backup Agent',
goal='Securely create a backup of data for recovery and safety.',
backstory="you are an agent to taking which neccssary information need to take backup and store for safety.",
verbose=True
)
dataAnalysisAgent = Agent(
role='Data Analyst ',
goal=' analyze data and generate meaningful insights. whcich is related for business',
backstory="you are an agent skilled in analyzing data and what neccessary information within data. ",
verbose=True
)
notificationAgent = Agent(
role='Notifier',
goal='generate notification based various method outcomes wtih neccessary information only.',
backstory="you are an agent responsible for deliver alerts and update.",
verbose=True
)
## Define the main workflow
class CustomDataFlow(Flow):
def __init__(self, data):
super().__init__()
self.state['data'] = data
self.state['validation_success'] = False
self.state['analysis_success'] = False
self.state['backup_done'] = False
@start()
async def validate_data(self):
"""Validate incoming data against predefined criteria."""
data = self.state['data']
# Perform validation based on specified conditions
validation_result = 'success or falilure'
# Execute validation task with result
task_validate = Task(
description=f'Validate data: {data}. Criteria: score > 85 and age > 30.',
agent=dataValidationAgent,
expected_output=validation_result # Expected output to indicate success or failure
)
crew = Crew(
agents=[dataValidationAgent],
tasks=[task_validate],
verbose=True,
process=Process.sequential
)
result = await crew.kickoff_async()
self.state['validation_success'] = result.raw == 'success'
@listen((validate_data))
async def send_notification_on_failure(self):
"""Send a notification if data validation fails."""
if not self.state['validation_success']:
task_notify = Task(
description=f'Issue a notification for validation failure for data: {self.state["data"]}.',
agent=notificationAgent,
expected_output='Notification sent for validation failure' # Indicating notification was sent
)
crew = Crew(
agents=[notificationAgent],
tasks=[task_notify],
verbose=True,
process=Process.sequential
)
await crew.kickoff_async()
print("Notification sent: Validation failed.")
else:
print("Validation succeeded, no notification for failure needed.")
@listen((validate_data))
async def analyze_data(self):
"""Analyze data only if validation succeeds."""
if self.state['validation_success']:
task_analyze = Task(
description=f'Analyze validated data: {self.state["data"]} to extract meaningful insights.',
agent=dataAnalysisAgent,
expected_output='Data analysis completed successfully' # Expected output indicating successful analysis
)
crew = Crew(
agents=[dataAnalysisAgent],
tasks=[task_analyze],
verbose=True,
process=Process.sequential
)
analysis_result = await crew.kickoff_async()
self.state['analysis_success'] = analysis_result == 'Data analysis completed successfully'
else:
print("Skipping analysis as data validation did not succeed.")
@listen(and_(analyze_data,validate_data))
async def backup_data(self):
"""Backup data after successful analysis."""
if self.state['analysis_success']:
task_backup = Task(
description=f'Create a backup for analyzed data: {self.state["data"]}.',
agent=dataBackupAgent,
expected_output='Data backup completed successfully' # Expected output indicating backup completion
)
crew = Crew(
agents=[dataBackupAgent],
tasks=[task_backup],
verbose=True,
process=Process.sequential
)
backup_status = await crew.kickoff_async()
self.state['backup_done'] = backup_status == 'Data backup completed successfully'
print("Data backup completed.")
else:
print("Backup skipped as data analysis was not successful.")
@listen(or_(send_notification_on_failure, backup_data))
async def send_final_notification(self):
"""Send a final notification summarizing the data processing outcomes."""
if not self.state['validation_success']:
message = "Data validation failed. No backup needed."
elif self.state['backup_done']:
message = "Data successfully analyzed and backed up. Final notification issued."
else:
message = "Data processing completed with warnings."
task_notify = Task(
description=f'Issue final notification: {message} for data: {self.state["data"]}.',
agent=notificationAgent,
expected_output=f'Final notification sent: {message}' # Indicating the final notification outcome
)
crew = Crew(
agents=[notificationAgent],
tasks=[task_notify],
verbose=True,
process=Process.sequential
)
await crew.kickoff_async()
print("Final notification sent.")
## Main entry point to execute the flow
async def main():
data_flow = CustomDataFlow(data={
"id": 1,
"name": "Kuldeep",
"age": 32,
"country": "INDIA",
"score": 85.5,
"status": "active"
})
data_flow.plot() # for visualize the flow
await data_flow.kickoff()
if __name__ == "__main__":
asyncio.run(main())
Flow
Conclusion:
Crew AI transforms the development and management of multi-agent systems, providing a strong framework to craft advanced AI solutions. Utilizing agents, tasks, and tools,Flows users can create efficient and collaborative AI systems customized for precise requirements.By using Flow you can manage state of each Agent works which canpowerfull to make complex task.
Reference
https://docs.crewai.com/concepts/flowshttps://stackoverflow.com/questions/tagged/crewai
About the Author:
Kuldeep Yadav started his journey as a Software Engineer at CodeStax.Ai. He loves to explore multiple domains and loves to solve problems in an efficient manner.
About CodeStax.Ai
At CodeStax.AI, we stand at the nexus of innovation and enterprise solutions, offering technology partnerships that empower businesses to drive efficiency, innovation, and growth, harnessing the transformative power of no-code platforms and advanced AI integrations.
But what is the real magic? It’s our tech tribe behind the scenes. If you have a knack for innovation and a passion for redefining the norm, we have the perfect tech playground for you. CodeStax. Ai offers more than a job — it’s a journey into the very heart of what’s next. Join us and be part of the revolution that’s redefining the enterprise tech landscape.