【Terraform】Terraform ~ AWS MSK Connect ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/05/14/122215

から分冊。

今回は、AWS MSK Connect をTerraform で作成する

MSK = Managed Streaming for apache Kafka

Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516

目次

【1】公式ドキュメント
 1)MSK Connect
 2)MSK Connect Custom plugin
 3)MSK Connect worker configuration
【2】サンプル
 0)Github sample
 1)MSK Connect Custom plugin
 2)MSK Connector
 3)MSK Connect worker configuration

【1】公式ドキュメント

1)MSK Connect

Resource: aws_mskconnect_connector
https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_connector

2)MSK Connect Custom plugin

Resource: aws_mskconnect_custom_plugin
https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_custom_plugin

3)MSK Connect worker configuration

Resource: aws_mskconnect_worker_configuration
https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_worker_configuration

【2】サンプル

0)Github sample

MSK Connect
https://github.com/clowdhaus/terraform-aws-msk-kafka-cluster/tree/main/examples/connect

1)MSK Connect Custom plugin

https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_custom_plugin

resource "aws_s3_bucket" "demo_s3_bucket" {
  bucket = "your-s3-bucket-name"
}

resource "aws_s3_object" "update_msk_custom_plugin_object" {
  bucket = aws_s3_bucket.demo_s3_bucket.id
  key    = "sample/xxxx/debezium.zip"
  source = "debezium.zip"
}

# ★Main★
resource "aws_mskconnect_custom_plugin" "demo_msk_connect_custom_plugin" {
  name         = "demo-msk-connect-custom-plugin-debezium"
  content_type = "ZIP"
  location {
    s3 {
      bucket_arn = aws_s3_bucket.demo_s3_bucket.arn
      file_key   = aws_s3_object.update_msk_custom_plugin_object.key
    }
  }
}

2)MSK Connect

Terraform ~ AWS MSK ~
https://dk521123.hatenablog.com/entry/2023/05/14/122215

の「1)MSK Cluster」「2)MSK Custom plugin」および
上記の「1)MSK Custom plugin」が必要
 => 特に、MSK connector は、MSK clusterありきなので、そこは必須。

https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_connector

# IAM for MSK connector

# IAM role
# https://stackoverflow.com/collectives/aws/articles/75388630/deploying-a-kafka-connect-connector-on-amazon-msk-connect-using-terraform
resource "aws_iam_role" "demo_msk_connector_service_execution_role" {
  name = "demo-msk-connector-role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "kafkaconnect.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
  })
}

# IAM policy
# See https://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/msk-connect-service-execution-role.html  
resource "aws_iam_role_policy" "demo_iam_role_policy_msk_connector" {
  name = "demo-iam-role-policy-msk-connector"
  role = aws_iam_role.demo_msk_connector_service_execution_role.id

  # Terraform's "jsonencode" function converts a
  # Terraform expression result to valid JSON syntax.
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid = "forCluster"
        Action = [
          "kafka-cluster:Connect",
          "kafka-cluster:DescribeCluster"
        ]
        Effect   = "Allow"
        Resource = "*" # cluster-arn
      },
      {
        Sid = "forTopicOfSink"
        Action = [
          "kafka-cluster:ReadData",
          "kafka-cluster:DescribeTopic"
        ]
        Effect   = "Allow"
        Resource = "*" # ARN of the topic that you want a sink connector to read from
      },
      {
        Sid = "forTopicOfSource"
        Action = [
           "kafka-cluster:WriteData",
           "kafka-cluster:DescribeTopic"
        ]
        Effect   = "Allow"
        Resource = "*" # ARN of the topic that you want a source connector to write to
      },
      {
        Sid = "forClusterAndTopic"
        Action = [
          "kafka-cluster:CreateTopic",
          "kafka-cluster:WriteData",
          "kafka-cluster:ReadData",
          "kafka-cluster:DescribeTopic"
        ]
        Effect   = "Allow"
        Resource = "*" # arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/__amazon_msk_connect_*
      },
      {
        Sid = "forMskGroup"
        Action = [
          "kafka-cluster:AlterGroup",
          "kafka-cluster:DescribeGroup"
        ]
        Effect   = "Allow"
        Resource = "*" # "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/__amazon_msk_connect_*",
      }
    ]
  })
}

# ★Main★
resource "aws_mskconnect_connector" "demo_mskconnect_connector" {
  name = "demo-msk-connect"
  description = "This is a main for this blog"

  kafkaconnect_version = "2.7.1"

  capacity {
    autoscaling {
      # The number of microcontroller units (MCUs)
      # Valid values: 1, 2, 4, 8. The default value is 1
      mcu_count        = 1
      # min/maxは、1-10の範囲で指定
      # min/maxは、min < max でなければならない(つまり同じ値もダメ)
      min_worker_count = 1
      max_worker_count = 2

      scale_in_policy {
        cpu_utilization_percentage = 20
      }

      scale_out_policy {
        cpu_utilization_percentage = 80
      }
    }
  }

  # Connector Conguration
  connector_configuration = {
    "name" = "demo-msk-connect"
    "connector.class" = "com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector"
    "tasks.max"       = "3"
    "topics"          = "demo-topic"
  }

  kafka_cluster {
    apache_kafka_cluster {
      bootstrap_servers = aws_msk_cluster.demo_msk_cluster.bootstrap_brokers_tls

      vpc {
        security_groups = [aws_security_group.demo_msk_sg.id]
        subnets = [aws_subnet.subnet_az1.id, subnet_az2.example2.id, subnet_az2.example3.id]
      }
    }
  }

  kafka_cluster_client_authentication {
    # "NONE" or "IAM"
    authentication_type = "NONE"
  }

  # https://docs.aws.amazon.com/ja_jp/MSKC/latest/mskc/API_KafkaClusterEncryptionInTransit.html
  # Valid Values: PLAINTEXT | TLS
  kafka_cluster_encryption_in_transit {
    encryption_type = "TLS"
  }

  plugin {
    custom_plugin {
      arn      = aws_mskconnect_custom_plugin.demo_msk_connect_custom_plugin.arn
      revision = aws_mskconnect_custom_plugin.demo_msk_connect_custom_plugin.latest_revision
    }
  }

  service_execution_role_arn = aws_iam_role.demo_msk_connector_service_execution_role.arn

  # To set timeout (Default is 20min)
  timeouts {
    create = "1h"
    update = "1h"
    delete = "1h"
  }
}

3)MSK Connect worker configuration

https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_worker_configuration

resource "aws_mskconnect_worker_configuration" "demo_mskconnect_worker_config" {
  name = "demo-mskconnect-worker-config"
  description = "For Demo"

  properties_file_content = <<EOT
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
EOT
}

# See "2)MSK Connect"
resource "aws_mskconnect_connector" "demo_mskconnect_connector" {
  name = "demo-msk-connect"
  description = "This is a main for this blog"

  # ...

  plugin {
    custom_plugin {
      arn      = aws_mskconnect_custom_plugin.demo_msk_connect_custom_plugin.arn
      revision = aws_mskconnect_custom_plugin.demo_msk_connect_custom_plugin.latest_revision
    }
  }
  # ★ See https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_connector#worker_configuration-configuration-block
  worker_configuration {
    arn      = aws_mskconnect_worker_configuration.demo_mskconnect_worker_config.arn
    revision = aws_mskconnect_worker_configuration.demo_mskconnect_worker_config.latest_revision
  }

  # ...
}

使用上の注意

* 

関連記事

Terraform ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/05/000224
Terraform ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/12/09/222057
Terraform ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/05/03/000000
Terraform ~ Terraformあれこれ ~
https://dk521123.hatenablog.com/entry/2023/05/15/205352
Terraform ~ AWS IAM ~
https://dk521123.hatenablog.com/entry/2023/04/12/214311
Terraform ~ AWS S3 ~
https://dk521123.hatenablog.com/entry/2023/04/09/104204
Terraform ~ AWS CloudWatch ~
https://dk521123.hatenablog.com/entry/2023/05/17/123335
Terraform ~ AWS KMS ~
https://dk521123.hatenablog.com/entry/2023/05/26/000000
Terraform ~ AWS Secrets Manager ~
https://dk521123.hatenablog.com/entry/2023/04/11/152801
Terraform ~ AWS MSK ~
https://dk521123.hatenablog.com/entry/2023/05/14/122215
AWS MSK Connect 内の 接続情報を設定を考える
https://dk521123.hatenablog.com/entry/2023/06/04/230737
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516