【Terraform】AWS MSK Connect 内の 接続情報を設定を考える

■ はじめに

https://dk521123.hatenablog.com/entry/2023/05/14/122215
https://dk521123.hatenablog.com/entry/2023/05/25/000000

の続き。

今回は、AWS MSK Connect の 設定部分、特に、機密に関わるDBなどの接続情報を
Secret Manager から取得することを考えてみたので、その調査結果をメモる。

手動で作成するのは、大変なので、Terraformコードにする

目次

【1】MSK Connect での注意点
【2】AWS MSK with Secret Manager での注意事項
【3】サンプル
 1)前準備
 2)Terraformコード(イメージ)

【1】MSK Connect での注意点

[1] 接続先情報(※1)を登録もできるのだが、
 AWS Management Consoleで表示されてしまう
 => ※1:以下の関連記事の
  「connection.user」「connection.password」などを参照のこと

Kafka Connect ~ Connector 構成プロパティ ~
https://dk521123.hatenablog.com/entry/2023/06/02/011131

[2] Connector の IAM に、secretsmanager:GetSecretValue の権限が必要
 => Secrets Managerから情報を取得する権限なので当たり前ちゃー当たり前だが

【2】AWS MSK with Secret Manager での注意事項

* 以下の公式ドキュメントを一読しておくのがいいかも

https://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/msk-password.html

より抜粋
~~~~~
* Amazon MSK クラスターのシークレットを作成するときは、
 次の要件に注意してください。

 + シークレットタイプには、他のタイプのシークレット (API キーなど) を選択します。
 + シークレット名は、AmazonMSK_ で始まる必要があります。
 + 既存のカスタム AWS KMS キーを使用するか、
 シークレット用に新しいカスタム AWS KMS キーを作成する必要があります。
 Secrets Manager は、デフォルトでシークレットに
 デフォルトの AWS KMS キーを使用します。

 ! 重要
 + デフォルトの AWS KMS キーで作成されたシークレットは、
  Amazon MSK クラスターでは使用できません。
~~~~~

【3】サンプル

* 以下を参考に作るとよさそう。

https://github.com/entechlog/aws-examples/tree/master/aws-data-platform/terraform/modules/msk

1)前準備

https://github.com/entechlog/aws-examples/tree/master/aws-data-platform/terraform/uploads/msk/connect/plugins

の「snowflake-kafka-connector-1.8.0.zip」を参考にプラグインをZIPで固めておく必要がある
 => なお、設定しているのは、以下を参考。

https://github.com/entechlog/aws-examples/blob/d306364938976de89335c8787bce093953d13caa/aws-data-platform/terraform/modules/msk/msk_connect_plugin.tf#LL54C30-L54C30

* 必要なモジュールとしては、以下のサイトから、ダウンロードして
 Lib内(特に「kafka-config-provider-aws-0.1.2.jar」)に依存するJARファイルも含めて
ZIP化する必要がある

https://www.confluent.io/hub/jcustenborder/kafka-config-provider-aws

依存関係を含めたJARファイル一覧例

* aws-java-sdk-core-1.11.1025.jar
* aws-java-sdk-secretsmanager-1.11.1025.jar
* commons-logging-1.1.3.jar
* connect-utils-0.7.173.jar
* guava-31.1-jre.jar
* httpclient-4.5.13.jar
* httpcore-4.4.13.jar
* joda-time-2.8.1.jar
* kafka-config-provider-aws-0.1.2.jar
* kafka-connect-jdbc-10.7.1.jar
* 自分が使っているDBのドライバ(e.g. snowflake-jdbc-3.13.28.jar)

2)Terraformコード(イメージ)

* サンプルってより、イメージ。
 => ベースは、以下の関連記事のサンプルを参照。

https://dk521123.hatenablog.com/entry/2023/05/14/122215
https://dk521123.hatenablog.com/entry/2023/05/25/000000

iam.tf

# IAM for MSK connector

# IAM role
# https://stackoverflow.com/collectives/aws/articles/75388630/deploying-a-kafka-connect-connector-on-amazon-msk-connect-using-terraform
resource "aws_iam_role" "demo_msk_connector_service_execution_role" {
  name = "demo-msk-connector-role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "kafkaconnect.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
  })
}

# IAM policy
# See https://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/msk-connect-service-execution-role.html  
resource "aws_iam_role_policy" "demo_iam_role_policy_msk_connector" {
  name = "demo-iam-role-policy-msk-connector"
  role = aws_iam_role.demo_msk_connector_service_execution_role.id

  # Terraform's "jsonencode" function converts a
  # Terraform expression result to valid JSON syntax.
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid = "forCluster"
        Action = [
          "kafka-cluster:Connect",
          "kafka-cluster:DescribeCluster"
        ]
        Effect   = "Allow"
        Resource = "*" # cluster-arn
      },
      {
        Sid = "forTopicOfSink"
        Action = [
          "kafka-cluster:ReadData",
          "kafka-cluster:DescribeTopic"
        ]
        Effect   = "Allow"
        Resource = "*" # ARN of the topic that you want a sink connector to read from
      },
      {
        Sid = "forTopicOfSource"
        Action = [
           "kafka-cluster:WriteData",
           "kafka-cluster:DescribeTopic"
        ]
        Effect   = "Allow"
        Resource = "*" # ARN of the topic that you want a source connector to write to
      },
      {
        Sid = "forClusterAndTopic"
        Action = [
          "kafka-cluster:CreateTopic",
          "kafka-cluster:WriteData",
          "kafka-cluster:ReadData",
          "kafka-cluster:DescribeTopic"
        ]
        Effect   = "Allow"
        Resource = "*" # arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/__amazon_msk_connect_*
      },
      {
        Sid = "forMskGroup"
        Action = [
          "kafka-cluster:AlterGroup",
          "kafka-cluster:DescribeGroup"
        ]
        Effect   = "Allow"
        Resource = "*" # "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/__amazon_msk_connect_*",
      },
      # ★注目★
      {
        Sid = "forSecretsManager"
        Action = [
          "secretsmanager:GetSecretValue"
        ]
        Effect   = "Allow"
        Resource = "*"
      },
      # SecretsManager に KMSキー暗号化してた場合
      {
        Sid = "forKmsOfSecretsManager"
        Action = [
          "kms:Decrypt",
          "kms:DescribeKey"
        ]
        Effect   = "Allow"
        Resource = "*"
      }
    ]
  })
}

vpc.tf

# For Network

# VPC
resource "aws_vpc" "vpc" {
  cidr_block = "192.168.0.0/22"
}

data "aws_availability_zones" "azs" {
  state = "available"
}

# Subnet

resource "aws_subnet" "subnet_az1" {
  availability_zone = data.aws_availability_zones.azs.names[0]
  cidr_block        = "192.168.0.0/24"
  vpc_id            = aws_vpc.vpc.id
}

resource "aws_subnet" "subnet_az2" {
  availability_zone = data.aws_availability_zones.azs.names[1]
  cidr_block        = "192.168.1.0/24"
  vpc_id            = aws_vpc.vpc.id
}

resource "aws_subnet" "subnet_az3" {
  availability_zone = data.aws_availability_zones.azs.names[2]
  cidr_block        = "192.168.2.0/24"
  vpc_id            = aws_vpc.vpc.id
}

# Security Group
resource "aws_security_group" "demo_msk_sg" {
  name        = "demo-msk-sg"
  vpc_id = aws_vpc.vpc.id
  description = "Allow All trafic"

  ingress {
    description      = "TLS from VPC"
    from_port        = 0
    to_port          = 0
    protocol         = "-1"
    cidr_blocks      = ["0.0.0.0/0"]
    ipv6_cidr_blocks = ["::/0"]
  }

  egress {
    from_port        = 0
    to_port          = 0
    protocol         = "-1"
    cidr_blocks      = ["0.0.0.0/0"]
    ipv6_cidr_blocks = ["::/0"]
  }

  tags = {
    Name = "demo-msk-sg"
  }
}

kms.tf

resource "aws_kms_key" "demo_kms_key_for_msk" {
  description = "This KMS key is for MSK"

  tags = {
    Name = "demo-kms-key-for-msk"
  }
}

resource "aws_kms_alias" "demo_kms_for_msk" {
  name          = "alias/demo/msk/demo_msk"
  target_key_id = aws_kms_key.demo_kms_key_for_msk.key_id
}

secrets_manager.tf

// connector credentials

resource "aws_secretsmanager_secret" "demo_msk_connect_secret" {
  name = "/msk/connect/demo-msk-connect-credentials"
  kms_key_id = aws_kms_key.demo_kms_key_for_msk.key_id
}

# ★Secrets Manager に 接続情報を設定
resource "aws_secretsmanager_secret_version" "demo_msk_connect_secret_version" {
  secret_id     = aws_secretsmanager_secret.demo_msk_connect_secret.id
  secret_string = jsonencode({
    db_url = "jdbc:mysql://127.0.0.1:3306/sample",
    db_user_name = "user",
    db_password = "password"
  })
}

# MSK ConnectにSecrets Manager へのアクセス許可を付与
resource "aws_secretsmanager_secret_policy" "demo_msk_connect_secret_policy" {
  secret_arn = aws_secretsmanager_secret.demo_msk_connect_secret.arn
  policy     = <<POLICY
{
  "Version" : "2012-10-17",
  "Statement" : [ {
    "Sid": "AWSKafkaResourcePolicy",
    "Effect" : "Allow",
    "Principal" : {
      "Service" : "kafka.amazonaws.com"
    },
    "Action" : "secretsmanager:getSecretValue",
    "Resource" : "${aws_secretsmanager_secret.demo_msk_connect_secret.arn}"
  } ]
}
POLICY
}

msk.tf

resource "aws_msk_serverless_cluster" "demo_msk_serverless_cluster" {
  cluster_name = "demo-msk-serverless-cluster"
  description = "This is a demo."

  vpc_config {
    subnet_ids = [
      aws_subnet.subnet_az1.id,
      aws_subnet.subnet_az2.id,
      aws_subnet.subnet_az3.id,
    ]
    security_group_ids = [aws_security_group.demo_msk_sg.id]
  }

  client_authentication {
    sasl {
      iam {
        enabled = true
      }
    }
  }
}

mskconnect_custom_plugin.tf

resource "aws_s3_bucket" "demo_s3_bucket" {
  bucket = "your-s3-bucket-name"
}

resource "aws_s3_object" "update_msk_custom_plugin_object" {
  bucket = aws_s3_bucket.demo_s3_bucket.id
  key    = "sample/xxxx/your-plugin.zip"
  source = "your-plugin.zip"
}

resource "aws_mskconnect_custom_plugin" "demo_msk_connect_custom_plugin" {
  name         = "demo-msk-connect-custom-plugin"
  content_type = "ZIP"
  location {
    s3 {
      bucket_arn = aws_s3_bucket.demo_s3_bucket.arn
      file_key   = aws_s3_object.update_msk_custom_plugin_object.key
    }
  }
}

mskconnect.tf

resource "aws_mskconnect_connector" "demo_mskconnect_connector" {
  name = "demo-msk-connect"
  description = "This is a main for this blog"

  kafkaconnect_version = "2.7.1"

  capacity {
    autoscaling {
      # The number of microcontroller units (MCUs)
      # Valid values: 1, 2, 4, 8. The default value is 1
      mcu_count = 1
      # min/maxは、1-10の範囲で指定
      # min/maxは、min < max でなければならない(つまり同じ値もダメ)
      min_worker_count = 1
      max_worker_count = 2

      scale_in_policy {
        cpu_utilization_percentage = 20
      }

      scale_out_policy {
        cpu_utilization_percentage = 80
      }
    }
  }

  # Connector Conguration
  connector_configuration = {
    "name" = "demo-msk-connect"
    "connector.class" = "com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector"
    "tasks.max"       = "3"
    "topics"          = "demo-topic"
    "connection.url" = "$${secretManager:demo-msk-connect-credentials:db_url}",
    "connection.user" = "$${secretManager:demo-msk-connect-credentials:db_user_name}",
    "connection.password" = "$${secretManager:demo-msk-connect-credentials:db_user_name}"
    "config.providers.secretManager.class" = "com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider"
    "config.providers" = "secretManager"
    "config.providers.secretManager.param.aws.region" = "us-west-2" # !! Important !!
  }

  kafka_cluster {
    apache_kafka_cluster {
      bootstrap_servers = aws_msk_cluster.demo_msk_cluster.bootstrap_brokers_tls

      vpc {
        security_groups = [aws_security_group.demo_msk_sg.id]
        subnets = [aws_subnet.subnet_az1.id, subnet_az2.example2.id, subnet_az2.example3.id]
      }
    }
  }

  kafka_cluster_client_authentication {
    # "NONE" or "IAM"
    authentication_type = "NONE"
  }

  # https://docs.aws.amazon.com/ja_jp/MSKC/latest/mskc/API_KafkaClusterEncryptionInTransit.html
  # Valid Values: PLAINTEXT | TLS
  kafka_cluster_encryption_in_transit {
    encryption_type = "TLS"
  }

  plugin {
    custom_plugin {
      arn      = aws_mskconnect_custom_plugin.snowflake.arn
      revision = aws_mskconnect_custom_plugin.snowflake.latest_revision
    }
  }

  service_execution_role_arn = aws_iam_role.demo_msk_connector_service_execution_role.arn
}

参考文献

https://www.entechlog.com/blog/kafka/integrating-msk-connect-with-msk/
https://lenses.io/blog/2023/04/how-to-protect-and-rotate-secrets-for-apache-kafka-connectors-aws-secret-manager/
https://awstip.com/aws-msk-connect-the-all-story-part-3-3-connector-configuration-1417f51900ce

関連記事

Terraform ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/05/000224
Terraform ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/12/09/222057
Terraform ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/05/03/000000
Terraform ~ Terraformあれこれ ~
https://dk521123.hatenablog.com/entry/2023/05/15/205352
Terraform ~ AWS IAM ~
https://dk521123.hatenablog.com/entry/2023/04/12/214311
Terraform ~ AWS S3 ~
https://dk521123.hatenablog.com/entry/2023/04/09/104204
Terraform ~ AWS CloudWatch ~
https://dk521123.hatenablog.com/entry/2023/05/17/123335
Terraform ~ AWS KMS ~
https://dk521123.hatenablog.com/entry/2023/05/26/000000
Terraform ~ AWS Secrets Manager ~
https://dk521123.hatenablog.com/entry/2023/04/11/152801
Terraform ~ AWS MSK ~
https://dk521123.hatenablog.com/entry/2023/05/14/122215
Terraform ~ AWS MSK Connect ~
https://dk521123.hatenablog.com/entry/2023/05/25/000000
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516
Kafka Connect ~ Connector 構成プロパティ ~
https://dk521123.hatenablog.com/entry/2023/06/02/011131