migrator.js

// Include configuration
const config = require('./config')

// Include the AWS SDK
const AWS = require('aws-sdk')

// Create instances for ECS & CloudFormation
const ECS = new AWS.ECS()
const CloudFormation = new AWS.CloudFormation()

// Include async and slack-notify
const async = require('async')
const slack = require('slack-notify')(config.slack.webhook_url)

/**
 * Search ECS task definitions for "-db-migrate" suffix
 * @param {string} stack
 */
const findMigrationTask = (stack) => {
  return new Promise(
    (resolve, reject) => {
      // Debug log
      console.log(`Finding migration task for stack: ${stack}`)

      // Expected DB migration task definition name ("-db-migrate suffix")
      const migrateTaskName = stack + '-db-migrate'

      // Build params to search for task definition
      const migrateTaskParams = {
        familyPrefix: migrateTaskName,
        maxResults: 1,
        sort: 'DESC',
        status: 'ACTIVE'
      }

      // Search ECS task definitions for migration task
      ECS.listTaskDefinitions(migrateTaskParams, (err, data) => {
        // Reject on error
        if (err) {
          reject(err)
          return
        }

        // Debug logs
        console.log('Migration task definitions data', data)

        // Resolve with task definition ARN (if found)
        if (data.taskDefinitionArns && data.taskDefinitionArns.length > 0) {
          resolve(data.taskDefinitionArns[0])
        } else {
          // Reject if migration task not found
          reject(new Error('Migration task not found'))
        }
      })
    }
  )
}

/**
 * Wrapper to describe CF stack as Promise
 * @param {string} stack
 */
const describeStack = (stack) => {
  return new Promise(
    (resolve, reject) => {
      // Debug logs
      console.log(`Describing stack: ${stack}`)

      // Describe requested stack
      CloudFormation.describeStacks({
        StackName: stack
      }, (err, data) => {
        // Reject on error
        if (err) {
          reject(err)
          return
        }

        // Debug logs
        console.log('Stack data', data)

        // If stacks found, resolve with stack info
        if (data.Stacks && data.Stacks.length > 0) {
          resolve(data.Stacks[0])
        } else {
          // Reject if no stacks found
          reject(new Error('No stack found'))
        }
      })
    }
  )
}

/**
 * Describe current service stack and find parent cluster stack to determine actual ECS cluster name
 * @param {string} stack
 */
const findClusterStack = (stack) => {
  return new Promise(
    (resolve, reject) => {
      console.log(`Finding parent cluster stack for stack: ${stack}`)
      // Describe current service stack (look for ParentClusterStack)
      describeStack(stack)
        .then(stackData => {
          // Search for ParentClusterStack from service stack params
          const findParentClusterStack = stackData.Parameters.filter(p => p.ParameterKey === 'ParentClusterStack')

          // Found ParentClusterStack param, load details on cluster stack
          if (findParentClusterStack.length > 0) {
            // Set cluster name of parent stack
            const parentClusterStackName = findParentClusterStack[0].ParameterValue

            // Debug logs
            console.log(`Found parent cluster stack: ${parentClusterStackName}`)

            // Describe the parent cluster stack to get output with actual cluster name
            describeStack(parentClusterStackName)
              .then(resolve) // Resolve with ParentClusterStack details
              .catch(reject) // Reject on error
          } else {
            reject(new Error('No Parent Cluster Stack'))
          }
        })
        .catch(reject)
    }
  )
}

/**
 * Describe current service stack and find parent cluster name
 * @param {string} stack
 */
const findClusterName= (stack) => {
  return new Promise(
    (resolve, reject) => {
      console.log(`Finding parent cluster name for stack: ${stack}`)
      // Describe current service stack (look for ClusterName output)
      describeStack(stack)
        .then(stackData => {
          // Search for ClusterName from service stack outputs
          const findClusterName = stackData.Outputs.filter(o => o.OutputKey === 'ClusterName')

          if (findClusterName.length > 0) {
            // Load cluster name from found value
            const parentClusterName = findClusterName[0].OutputValue

            // Debug logs
            console.log(`Found parent cluser name: ${parentClusterName}`)

            resolve(parentClusterName)
          } else {
            // TODO: Additionally search using ParentClusterStack param?
            reject(new Error(`Unable to find cluster name for stack: ${stack}`))
          }
        })
        .catch(reject)
    }
  )
}

/**
 * Trigger migration task from found cluster and task definition
 * @param {string} cluster
 * @param {string} taskDefinition
 */
const runMigration = (cluster, taskDefinition) => {
  return new Promise(
    (resolve, reject) => {
      console.log(`Running task: ${taskDefinition} on cluster: ${cluster}`)

      const runParams = {
        cluster,
        taskDefinition
      }

      ECS.runTask(runParams, (err, data) => {
        if (err) {
          reject(err)
          return
        }

        console.log('Running ECS task data', data)

        const taskName = taskDefinition.indexOf('task-definition/') !== -1 ? taskDefinition.split('task-definition/')[1] : taskDefinition

        // Build Slack notification object (inherits from config.slack.defaults)
        const notification = Object.assign({}, config.slack.defaults, {
          attachments: [
            {
              title: 'Running DB Migration',
              color: 'good',
              fallback: 'Running DB Migration',
              text: `RUNNING: ${taskName}\nCLUSTER: ${cluster}`
            }
          ]
        })

        // Send the notification to slack
        slack.send(notification, (err) => {
          if (err) {
            reject(err)
          } else {
            resolve()
          }
        })
      })
    }
  )
}

/**
 * Initialize migration (fail gracefully)
 * @param {string} stack
 */
const initMigration = (stack) => {
  return new Promise(
    (resolve, reject) => {
      // Find migration task (resolve on error to fail gracefully if not found)
      findMigrationTask(stack)
        // Migration task definition found!
        .then(migrationTaskArn => {
          // Debug logs
          console.log(`Found migration task: ${migrationTaskArn}`)

          findClusterName(stack)
            // Cluster name found!
            .then(parentClusterName => {
              // Debug logs
              console.log('Found parent cluster name:', parentClusterName)

              // Trigger migration using found cluster name and migration task ARN
              runMigration(parentClusterName, migrationTaskArn)
                .then(resolve) // Resolve after starting migration task
                .catch(reject) // Reject on failure to execute found migration (expected to succeed)
            })
            .catch(err => {
              console.log('Failed to find clust name for stack', stack, err)
              resolve()
            })
        })
        .catch(resolve)
    }
  )
}

/**
 * Handle a stack change event (as published from SNS topic)
 * @param {object} event
 */
module.exports.stackChange = (event) => {
  // Return a promise for the promise chain
  return new Promise(
    (resolve, reject) => {
      // Ensure the SNS records have been parsed into messages
      if (!event.parsed.messages || event.parsed.messages.length === 0) {
        reject(new Error('No messages to be sent.'))
        return
      }

      console.log('Messages:', JSON.stringify(event.parsed.messages, null, 2))

      // Loop through all messages (async)
      async.each(event.parsed.messages, (msg, next) => {
        // Flag to determine whether this message is for parent stack or a stack resource
        const isStackMessage = (msg.ResourceType === 'AWS::CloudFormation::Stack' && msg.LogicalResourceId === msg.StackName)

        if (isStackMessage && msg.ResourceStatus === 'UPDATE_COMPLETE') {
          console.log('Running migration for stack: ', msg.StackName)

          // Attempt migration for stack
          initMigration(msg.StackName)
            .then(resp => {
              console.log('Migration init:', resp)

              next()
            })
            .catch(e => next(e))
        } else {
          console.log('Not migrating for msg:', msg)
          next()
        }
      }, err => {
        if (err) {
          // Reject on error
          reject(err)
        } else {
          // Resolve with event
          resolve(event)
        }
      })
    }
  )
}