Flowpipe: A Cloud Scripting Engine for DevOps Workflows

April 30, 2024 By Mark Otto 0

Flowpipe is an open source tool built for DevOps teams. You can express pipelines, steps, and triggers in HCL — the familiar DevOps language – to orchestrate tasks that query AWS (and other services you regularly use). Compose pipelines using mods, and mix in SQL, AWS Lambda-compatible functions, or containers as needed. Develop, test, and run it all on your local machine — no deployment surprises or long debug cycles. Then schedule the pipelines or respond to events in real-time. Use it to:

  • Orchestrate your cloud. Build simple steps into complex workflows. Run and test locally. Compose solutions across clouds using open source mods.
  • Connect people and tools. Connect your cloud data to people and systems using email, chat & APIs. Workflow steps can even run containers, custom functions, and more.
  • Respond to events. Run workflows manually or on a schedule. Trigger pipelines from webhooks or changes in data.
  • Use code, not clicks. Build and deploy DevOps workflows like infrastructure. Code in HCL and deploy from version control.

Here’s a Flowpipe command to deactivate expired AWS Identity and Access Management (IAM) access keys, and notify a Slack channel with messages like “The access key KEY_ID for user USER_NAME has been deactivated.”

flowpipe pipeline run pipeline.deactivate_expired_aws_iam_access_keys \ --arg slack_channel=iam_hygiene

Let’s break this down.

flowpipe

The binary that you run on your local machine, or in a cloud VM or CI/CD pipeline

flowpipe pipeline

The subcommand to list, view, and run pipelines

flowpipe pipeline run pipeline.deactivate_expired_aws_iam_access_keys

Invocation of a pipeline that checks for expired keys, deactivates them, and sends a Slack notification

--arg slack_channel=iam_hygiene

The channel to notify.

If you want to try this yourself, here’s the complete recipe.

  1. Download and install Flowpipe.
  2. Clone the samples repo

git clone https://github.com/turbot/flowpipe-samples

  1. Visit the subdirectory for this sample:

cd flowpipe-samples/public_cloud/deactivate_expired_aws_iam_access_keys

  1. Install dependencies

flowpipe mod install

In this case, the sample relies on two library mods: AWS and Slack.

flowpipe mod list deactivate_expired_aws_iam_access_keys
├── github.com/turbot/[email protected]
└── github.com/turbot/[email protected]

  1. Set up credentials
export AWS_PROFILE={YOUR_PROFILE_NAME}
export SLACK_TOKEN={YOUR_SLACK_TOKEN}

Alternatively you could use Flowpipe credentials to manage these secrets.

  1. Run the command
flowpipe pipeline run pipeline.deactivate_expired_aws_iam_access_keys \ --arg slack_channel=iam_hygiene —-arg aws_expire_after_days=60

This example adds the argument aws_expire_after_days which defaults to 90 days.

Show me the code!

Like all Flowpipe samples and libraries, this sample is available on GitHub under an Apache 2.0 license. Because it relies on the AWS and Slack libraries, the sample’s code is concise.

pipeline “deactivate_expired_aws_iam_access_keys” { title = “Deactivate expired AWS IAM access keys” description = “Deactivates expired AWS IAM access keys and notifies via Slack channel.”

pipeline "deactivate_expired_aws_iam_access_keys" { title = "Deactivate expired AWS IAM access keys" description = "Deactivates expired AWS IAM access keys and notifies via Slack channel." param "slack_channel" { type = string description = "Channel, private group, or IM channel to send message to." } param "aws_expire_after_days" { type = number description = "Number of days after which the access key should be deactivated." default = 90 } step "pipeline" "list_iam_users" { pipeline = aws.pipeline.list_iam_users } step "pipeline" "list_iam_access_keys" { for_each = { for user in step.pipeline.list_iam_users.output.users : user.UserName => user.UserName } pipeline = aws.pipeline.list_iam_access_keys args = { cred = param.aws_cred user_name = each.value } } step "pipeline" "update_iam_access_key" { for_each = flatten([for accessKey in step.pipeline.list_iam_access_keys : accessKey.output.access_keys]) # Run only if the access key is active and older than specified number of days. if = each.value.Status == "Active" && timecmp(each.value.CreateDate, timeadd(timestamp(), "-${param.aws_expire_after_days * 24}h")) < 0 pipeline = aws.pipeline.update_iam_access_key args = { cred = param.aws_cred user_name = each.value.UserName access_key_id = each.value.AccessKeyId status = "Inactive" } } step "pipeline" "post_message" { for_each = flatten([for accessKey in step.pipeline.list_iam_access_keys : accessKey.output.access_keys]) # Run only if the access key is active and older than specified number of days. if = each.value.Status == "Active" && timecmp(each.value.CreateDate, timeadd(timestamp(), "-${param.aws_expire_after_days * 24}h")) < 0 pipeline = slack.pipeline.post_message args = { cred = param.slack_cred channel = param.slack_channel text = "The access key ${each.value.AccessKeyId} for user ${each.value.UserName} has been deactivated." } }
}

Like all pipelines, this one comprises a set of steps. In this case there are two steps: one to inactivate expired access keys, and one to notify Slack. Both steps are of type pipeline – that is, steps that invoke other pipelines. But a pipeline can invoke any or all of these step types.

Open source Flowpipe step types table

For example, the helper pipeline aws.pipeline.list_iam_access_keys uses a container step to invoke the AWS CLI.

step "container" "list_iam_access_keys" { image = "public.ecr.aws/aws-cli/aws-cli" cmd = concat( ["iam", "list-access-keys"], param.user_name != null ? ["--user-name", "${param.user_name}"] : [] ) env = credential.aws[param.cred].env }

With Flowpipe there’s always more than one way to do something, though. For example, Flowpipe does not require Steampipe, but will happily use it. So if you’ve installed Steampipe along with the AWS plugin, Flowpipe has access to the 460+ tables provided by the plugin, and you could alternatively do this.

step "query" "list_iam_access_keys" { connection_string = "postgres://steampipe@localhost:9193/steampipe" sql = <<EOQ select user_name, create_date, status from aws_iam_access_key EOQ }

Flowpipe HCL

Terraform supports a declarative approach to defining infrastructure. Flowpipe, similarly, enables declarative definition of workflow. DevOps pros will be instantly familiar with the basics of Flowpipe HCL: the core ingredients are blocks, arguments, and expressions; there’s no intrinsic ordering of blocks but you can impose an order when block B refers to block A; steps are packaged as modules that you can reuse and combine.

The same built-in expressions, meta-arguments, and functions are available. Examples shown above include the for expression, the for_each meta-argument (and its associated each object), and the functions flatten, timecmp, and timeadd. In addition to the standard Terraform functions, Flowpipe defines a few of its own including several related to error handling.

There are also a number of built-in step arguments used for flow control, error handling, and documentation.

common step arguments in Flowpipe

With the addition of these affordances, Flowpipe HCL behaves more like a full fledged programming language than Terraform HCL which is primarily a resource configuration language.

Composable Mods and the Flowpipe Hub

Flowpipe mods are open source composable pipelines so you can remix and reuse your code — or build on the great work of the community. The Flowpipe Hub is a searchable directory of mods to discover and use. The source code for mods is available on GitHub if you’d like to learn or contribute.

Flowpipe library mods make it easy to work with common services including AWS, GitHub, Jira, Slack, Teams, Zendesk … and many more!

Flowpipe sample mods are ready-to-run samples that demonstrate patterns and use of various library mods.

It’s easy to create your own mod that runs standalone, perhaps calling dependent mods as shown above. And any mod you create can itself be called by another. So, for example, the deactivate_expired_aws_iam_access_keys mod could be one step in a higher-level mod that enforces a set of access-related policies.

Schedules, Events and Triggers

DevOps is filled with routine work, which Flowpipe is happy to do on a schedule. Just set up a schedule trigger to run a pipeline at regular intervals:

trigger "schedule" "daily_3pm" { schedule = "* 15 * * *" pipeline = pipeline.daily_task
}

Events are critical to the world of AWS scripting — we need to respond immediately to code pushes, infrastructure change, Slack messages, etc. So Flowpipe has an http trigger to handle incoming webhooks and run a pipeline:

trigger "http" "my_webhook" { pipeline = pipeline.my_pipeline args = { event = self.request_body }
}

The sweet spot for AWS automation

To get started with Flowpipe: download the tool, follow the tutorial, peruse the library mods, and check out the samples. Then let us know how it goes!