Type something to search...
Mastering CrewAI: Chapter 2— Flows

Mastering CrewAI: Chapter 2— Flows

Event-Driven AI Workflows

Flows allow us to orchestrate tasks and manage Crews.

Previous Chapter:

  • We can chain together multiple Crews and tasks to construct AI workflows.
  • Flows share state between different tasks within a workflow.
  • It is event-driven, tasks can trigger subsequent tasks based on specific events.
  • We can implement conditional logic, loops, and branching within workflows.
crewai create flow flow-example

It will create a new template project for us. Inside the main.py file, we have the following code.

#!/usr/bin/env python
from random import randint

from pydantic import BaseModel

from crewai.flow.flow import Flow, listen, start

from .crews.poem_crew.poem_crew import PoemCrew


class PoemState(BaseModel):
    sentence_count: int = 1
    poem: str = ""


class PoemFlow(Flow[PoemState]):

    @start()
    def generate_sentence_count(self):
        print("Generating sentence count")
        self.state.sentence_count = randint(1, 5)

    @listen(generate_sentence_count)
    def generate_poem(self):
        print("Generating poem")
        result = (
            PoemCrew()
            .crew()
            .kickoff(inputs={"sentence_count": self.state.sentence_count})
        )

        print("Poem generated", result.raw)
        self.state.poem = result.raw

    @listen(generate_poem)
    def save_poem(self):
        print("Saving poem")
        with open("poem.txt", "w") as f:
            f.write(self.state.poem)


def kickoff():
    poem_flow = PoemFlow()
    poem_flow.kickoff()


def plot():
    poem_flow = PoemFlow()
    poem_flow.plot()


if __name__ == "__main__":
    kickoff()

The PoemState class inherits from BaseModel and is used to manage the workflow’s state. It has:

  • sentence_count: Tracks the number of sentences to generate.
  • poem: Holds the generated poem as a string.
class PoemState(BaseModel):
    sentence_count: int = 1
    poem: str = ""

The PoemFlow class inherits from Flow and implements the workflow logic. Each method represents a step in the workflow.

class PoemFlow(Flow[PoemState]):

    @start()
    def generate_sentence_count(self):
        print("Generating sentence count")
        self.state.sentence_count = randint(1, 5)

generate_sentence_count is the entry point for the workflow, marked with the @start decorator.

It just updates the workflow state self.state.sentence_count.

generate_poem is triggered after generate_sentence_count completes, using the @listen(generate_sentence_count) decorator.

Its job is to call the PoemCrew to generate a poem with the specified number of sentences in the state.

The result (result.raw) is stored in the state (self.state.poem).

class PoemFlow(Flow[PoemState]):
    ...

    @listen(generate_sentence_count)
    def generate_poem(self):
        print("Generating poem")
        result = (
            PoemCrew()
            .crew()
            .kickoff(inputs={"sentence_count": self.state.sentence_count})
        )

        print("Poem generated", result.raw)
        self.state.poem = result.raw

I will not go into the details of PoemCrew since we already covered Crews in the previous chapter. It is simply a Crew, and generate_poem is used to initiate it.

save_poem is triggered after the poem is generated @listen(generate_poem)

Let’s run it.

First, put this code into flow_example/src/flow_example/crews/poem_crew/poem_crew.py

from dotenv import load_dotenv
load_dotenv()

Then:

python src/flow_example/main.py
Generating sentence count
Generating poem
## Agent: CrewAI Poem Writer
### Task: Write a poem about how CrewAI is awesome. Ensure the poem is engaging and adheres to the specified sentence count of 1.



## Agent: CrewAI Poem Writer
### Final Answer: 
In a whirlwind of bytes and brilliance, CrewAI dances through data, spinning solutions with a digital grace that leaves us all in awe.


Poem generated In a whirlwind of bytes and brilliance, CrewAI dances through data, spinning solutions with a digital grace that leaves us all in awe.
Saving poem

It created a poem.txt file and wrote the poem to it.

In a whirlwind of bytes and brilliance, CrewAI dances through data, spinning solutions with a digital grace that leaves us all in awe.

When we run the Python code, as shown above, it executes the kickoff method.

We could also run the plot method, which generates an HTML file to display the flow created by the code.

def kickoff():
    poem_flow = PoemFlow()
    poem_flow.kickoff()


def plot():
    poem_flow = PoemFlow()
    poem_flow.plot()


if __name__ == "__main__":
    #kickoff()
    plot()

When we run this code (calling only the plot function), we get the following output:

In essence, what we have done here is control the flow and procedure of a single Crew, PoemCrew. We created a class (PoemState) to manage the state data, established an entry point for our workflow, allowed the Crew to perform its task, and finally set the concluding stage before wrapping up.

It is also possible to do more; we can control the flow using or_, and_, and @router.

or_ function enables a listener method to be triggered when any specified method emits an output.

from crewai.flow.flow import Flow, listen, or_, start

class ExampleFlow(Flow):
    @start()
    def method_a(self):
        return "Output A"

    @start()
    def method_b(self):
        return "Output B"

    @listen(or_(method_a, method_b))
    def listener_method(self, output):
        print(f"Triggered by: {output}")

If either method_a or method_b completes, listener_method will be triggered.

and_ function enables a listener method to be triggered only when all the specified methods emit an output.

from crewai.flow.flow import Flow, listen, and_, start

class ExampleFlow(Flow):
    @start()
    def method_x(self):
        return "Output X"

    @start()
    def method_y(self):
        return "Output Y"

    @listen(and_(method_x, method_y))
    def listener_method(self, outputs):
        print(f"Triggered after: {outputs}")

listener_method is triggered only after both method_x and method_y have emitted their outputs.

@router() decorator allows conditional routing within a workflow based on the output of a method.

import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    success_flag: bool = False

class RouterFlow(Flow[ExampleState]):

    @start()
    def start_method(self):
        print("Starting the structured flow")
        random_boolean = random.choice([True, False])
        self.state.success_flag = random_boolean

    @router(start_method)
    def second_method(self):
        if self.state.success_flag:
            return "success"
        else:
            return "failed"

    @listen("success")
    def third_method(self):
        print("Third method running")

    @listen("failed")
    def fourth_method(self):
        print("Fourth method running")


flow = RouterFlow()
flow.kickoff()

second_method routes the flow to either "success" or "failed" paths based on the success_flag state. Depending on the route, either third_method (for success) or fourth_method (for failure) is executed, ensuring the workflow adapts to the state dynamically.

Read More

Sources

https://docs.crewai.com/concepts/flows

Related Posts

10 Creative Ways to Use ChatGPT Search The Web Feature

10 Creative Ways to Use ChatGPT Search The Web Feature

For example, prompts and outputs Did you know you can use the “search the web” feature of ChatGPT for many tasks other than your basic web search? For those who don't know, ChatGPT’s new

Read More
📚 10 Must-Learn Skills to Stay Ahead in AI and Tech 🚀

📚 10 Must-Learn Skills to Stay Ahead in AI and Tech 🚀

In an industry as dynamic as AI and tech, staying ahead means constantly upgrading your skills. Whether you’re aiming to dive deep into AI model performance, master data analysis, or transform trad

Read More
10 Powerful Perplexity AI Prompts to Automate Your Marketing Tasks

10 Powerful Perplexity AI Prompts to Automate Your Marketing Tasks

In today’s fast-paced digital world, marketers are always looking for smarter ways to streamline their efforts. Imagine having a personal assistant who can create audience profiles, suggest mar

Read More
10+ Top ChatGPT Prompts for UI/UX Designers

10+ Top ChatGPT Prompts for UI/UX Designers

AI technologies, such as machine learning, natural language processing, and data analytics, are redefining traditional design methodologies. From automating repetitive tasks to enabling personal

Read More
100 AI Tools to Finish Months of Work in Minutes

100 AI Tools to Finish Months of Work in Minutes

The rapid advancements in artificial intelligence (AI) have transformed how businesses operate, allowing people to complete tasks that once took weeks or months in mere minutes. From content creat

Read More
17 Mindblowing GitHub Repositories You Never Knew Existed

17 Mindblowing GitHub Repositories You Never Knew Existed

Github Hidden Gems!! Repositories To Bookmark Right Away Learning to code is relatively easy, but mastering the art of writing better code is much tougher. GitHub serves as a treasur

Read More