Source code for webhook2lambda2sqs.terraform_runner

The latest version of this package is available at:

Copyright 2016 Jason Antman <> <>

    This file is part of webhook2lambda2sqs, also known as webhook2lambda2sqs.

    webhook2lambda2sqs is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    webhook2lambda2sqs is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with webhook2lambda2sqs.  If not, see <>.

The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.

Jason Antman <> <>

import logging
from webhook2lambda2sqs.utils import run_cmd
import re
import json

logger = logging.getLogger(__name__)


[docs]class TerraformRunner(object): def __init__(self, config, tf_path): """ Initialize the Terraform command runner. :param config: program configuration :type config: :py:class:`~.Config` :param tf_path: path to terraform binary :type tf_path: str """ self.config = config self.tf_path = tf_path # if we fail getting the version, assume newest self.tf_version = (999, 999, 999) self._validate()
[docs] def _validate(self): """ Confirm that we can run terraform (by calling its version action) and then validate the configuration. """ try: out = self._run_tf('version') except: raise Exception('ERROR: executing \'%s version\' failed; is ' 'terraform installed and is the path to it (%s) ' 'correct?' % (self.tf_path, self.tf_path)) res ='Terraform v(\d+)\.(\d+)\.(\d+)', out) if res is None: logger.error('Unable to determine terraform version; will not ' 'validate config. Note that this may cause problems ' 'when using older Terraform versions. This program ' 'requires Terraform >= 0.6.16.') return self.tf_version = ( int(, int(, int( ) logger.debug('Terraform version: %s', self.tf_version) if self.tf_version < (0, 6, 16): raise Exception('This program requires Terraform >= 0.6.16, as ' 'that version introduces a bug fix for working ' 'with api_gateway_integration_response resources; ' 'see:' '/5893') try: self._run_tf('validate', ['.']) except Exception as ex: logger.critical("Terraform config validation failed. " "This is almost certainly a bug in " "webhook2lambda2sqs; please re-run with '-vv' and " "open a bug at <" "webhook2lambda2sqs/issues>. Exception: %s", ex) raise Exception( 'ERROR: Terraform config validation failed: %s' % ex )
[docs] def _args_for_remote(self): """ Generate arguments for 'terraform remote config'. Return None if not present in configuration. :return: list of args for 'terraform remote config' or None :rtype: :std:term:`list` """ conf = self.config.get('terraform_remote_state') if conf is None: return None args = ['-backend=%s' % conf['backend']] for k, v in sorted(conf['config'].items()): args.append('-backend-config="%s=%s"' % (k, v)) return args
[docs] def _set_remote(self, stream=False): """ Call :py:meth:`~._args_for_remote`; if the return value is not None, execute 'terraform remote config' with those arguments and ensure it exits 0. :param stream: whether or not to stream TF output in realtime :type stream: bool """ args = self._args_for_remote() if args is None: logger.debug('_args_for_remote() returned None; not configuring ' 'terraform remote') return logger.warning('Setting terraform remote config: %s', ' '.join(args)) args = ['config'] + args self._run_tf('remote', cmd_args=args, stream=stream)'Terraform remote configured.')
[docs] def _run_tf(self, cmd, cmd_args=[], stream=False): """ Run a single terraform command via :py:func:`~.utils.run_cmd`; raise exception on non-zero exit status. :param cmd: terraform command to run :type cmd: str :param cmd_args: arguments to command :type cmd_args: :std:term:`list` :return: command output :rtype: str :raises: Exception on non-zero exit """ args = [self.tf_path, cmd] + cmd_args arg_str = ' '.join(args)'Running terraform command: %s', arg_str) out, retcode = run_cmd(arg_str, stream=stream) if retcode != 0: logger.critical('Terraform command (%s) failed with exit code ' '%d:\n%s', arg_str, retcode, out) raise Exception('terraform %s failed' % cmd) return out
[docs] def plan(self, stream=False): """ Run a 'terraform plan' :param stream: whether or not to stream TF output in realtime :type stream: bool """ self._setup_tf(stream=stream) args = ['-input=false', '-refresh=true', '.'] logger.warning('Running terraform plan: %s', ' '.join(args)) out = self._run_tf('plan', cmd_args=args, stream=stream) if stream: logger.warning('Terraform plan finished successfully.') else: logger.warning("Terraform plan finished successfully:\n%s", out)
[docs] def _taint_deployment(self, stream=False): """ Run 'terraform taint aws_api_gateway_deployment.depl' to taint the deployment resource. This is a workaround for :param stream: whether or not to stream TF output in realtime :type stream: bool """ args = ['aws_api_gateway_deployment.depl'] logger.warning('Running terraform taint: %s as workaround for ' '<>', ' '.join(args)) out = self._run_tf('taint', cmd_args=args, stream=stream) if stream: logger.warning('Terraform taint finished successfully.') else: logger.warning("Terraform taint finished successfully:\n%s", out)
[docs] def apply(self, stream=False): """ Run a 'terraform apply' :param stream: whether or not to stream TF output in realtime :type stream: bool """ self._setup_tf(stream=stream) try: self._taint_deployment(stream=stream) except Exception: pass args = ['-input=false', '-refresh=true', '.'] logger.warning('Running terraform apply: %s', ' '.join(args)) out = self._run_tf('apply', cmd_args=args, stream=stream) if stream: logger.warning('Terraform apply finished successfully.') else: logger.warning("Terraform apply finished successfully:\n%s", out) self._show_outputs()
[docs] def _show_outputs(self): """ Print the terraform outputs. """ outs = self._get_outputs() print("\n\n" + '=> Terraform Outputs:') for k in sorted(outs): print('%s = %s' % (k, outs[k]))
[docs] def _get_outputs(self): """ Return a dict of the terraform outputs. :return: dict of terraform outputs :rtype: dict """ if self.tf_version >= (0, 7, 0): logger.debug('Running: terraform output') res = self._run_tf('output', cmd_args=['-json']) outs = json.loads(res.strip()) res = {} for k in outs.keys(): if isinstance(outs[k], type({})): res[k] = outs[k]['value'] else: res[k] = outs[k] logger.debug('Terraform outputs: %s', res) return res logger.debug('Running: terraform output') res = self._run_tf('output') outs = {} for line in res.split("\n"): line = line.strip() if line == '': continue parts = line.split(' = ', 1) outs[parts[0]] = parts[1] logger.debug('Terraform outputs: %s', outs) return outs
[docs] def destroy(self, stream=False): """ Run a 'terraform destroy' :param stream: whether or not to stream TF output in realtime :type stream: bool """ self._setup_tf(stream=stream) args = ['-refresh=true', '-force', '.'] logger.warning('Running terraform destroy: %s', ' '.join(args)) out = self._run_tf('destroy', cmd_args=args, stream=stream) if stream: logger.warning('Terraform destroy finished successfully.') else: logger.warning("Terraform destroy finished successfully:\n%s", out)
[docs] def _setup_tf(self, stream=False): """ Setup terraform; either 'remote config' or 'init' depending on version. """ if self.tf_version < (0, 9, 0): self._set_remote(stream=stream) return self._run_tf('init', stream=stream)'Terraform initialized')