■ はじめに
https://dk521123.hatenablog.com/entry/2023/05/14/122215
https://dk521123.hatenablog.com/entry/2023/05/25/000000
の続き。 今回は、AWS MSK Connect の 設定部分、特に、機密に関わるDBなどの接続情報を Secret Manager から取得することを考えてみたので、その調査結果をメモる。 手動で作成するのは、大変なので、Terraformコードにする
目次
【1】MSK Connect での注意点 【2】AWS MSK with Secret Manager での注意事項 【3】サンプル 1)前準備 2)Terraformコード(イメージ)
【1】MSK Connect での注意点
[1] 接続先情報(※1)を登録もできるのだが、 AWS Management Consoleで表示されてしまう => ※1:以下の関連記事の 「connection.user」「connection.password」などを参照のこと
Kafka Connect ~ Connector 構成プロパティ ~
https://dk521123.hatenablog.com/entry/2023/06/02/011131
[2] Connector の IAM に、secretsmanager:GetSecretValue の権限が必要 => Secrets Managerから情報を取得する権限なので当たり前ちゃー当たり前だが
【2】AWS MSK with Secret Manager での注意事項
* 以下の公式ドキュメントを一読しておくのがいいかも
https://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/msk-password.html
より抜粋 ~~~~~ * Amazon MSK クラスターのシークレットを作成するときは、 次の要件に注意してください。 + シークレットタイプには、他のタイプのシークレット (API キーなど) を選択します。 + シークレット名は、AmazonMSK_ で始まる必要があります。 + 既存のカスタム AWS KMS キーを使用するか、 シークレット用に新しいカスタム AWS KMS キーを作成する必要があります。 Secrets Manager は、デフォルトでシークレットに デフォルトの AWS KMS キーを使用します。 ! 重要 + デフォルトの AWS KMS キーで作成されたシークレットは、 Amazon MSK クラスターでは使用できません。 ~~~~~
【3】サンプル
* 以下を参考に作るとよさそう。
https://github.com/entechlog/aws-examples/tree/master/aws-data-platform/terraform/modules/msk
1)前準備
の「snowflake-kafka-connector-1.8.0.zip」を参考にプラグインをZIPで固めておく必要がある => なお、設定しているのは、以下を参考。
* 必要なモジュールとしては、以下のサイトから、ダウンロードして Lib内(特に「kafka-config-provider-aws-0.1.2.jar」)に依存するJARファイルも含めて ZIP化する必要がある
https://www.confluent.io/hub/jcustenborder/kafka-config-provider-aws
依存関係を含めたJARファイル一覧例
* aws-java-sdk-core-1.11.1025.jar * aws-java-sdk-secretsmanager-1.11.1025.jar * commons-logging-1.1.3.jar * connect-utils-0.7.173.jar * guava-31.1-jre.jar * httpclient-4.5.13.jar * httpcore-4.4.13.jar * joda-time-2.8.1.jar * kafka-config-provider-aws-0.1.2.jar * kafka-connect-jdbc-10.7.1.jar * 自分が使っているDBのドライバ(e.g. snowflake-jdbc-3.13.28.jar)
2)Terraformコード(イメージ)
* サンプルってより、イメージ。 => ベースは、以下の関連記事のサンプルを参照。
https://dk521123.hatenablog.com/entry/2023/05/14/122215
https://dk521123.hatenablog.com/entry/2023/05/25/000000
iam.tf
# IAM for MSK connector # IAM role # https://stackoverflow.com/collectives/aws/articles/75388630/deploying-a-kafka-connect-connector-on-amazon-msk-connect-using-terraform resource "aws_iam_role" "demo_msk_connector_service_execution_role" { name = "demo-msk-connector-role" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [ { "Effect": "Allow", "Principal": { "Service": "kafkaconnect.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }) } # IAM policy # See https://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/msk-connect-service-execution-role.html resource "aws_iam_role_policy" "demo_iam_role_policy_msk_connector" { name = "demo-iam-role-policy-msk-connector" role = aws_iam_role.demo_msk_connector_service_execution_role.id # Terraform's "jsonencode" function converts a # Terraform expression result to valid JSON syntax. policy = jsonencode({ Version = "2012-10-17" Statement = [ { Sid = "forCluster" Action = [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster" ] Effect = "Allow" Resource = "*" # cluster-arn }, { Sid = "forTopicOfSink" Action = [ "kafka-cluster:ReadData", "kafka-cluster:DescribeTopic" ] Effect = "Allow" Resource = "*" # ARN of the topic that you want a sink connector to read from }, { Sid = "forTopicOfSource" Action = [ "kafka-cluster:WriteData", "kafka-cluster:DescribeTopic" ] Effect = "Allow" Resource = "*" # ARN of the topic that you want a source connector to write to }, { Sid = "forClusterAndTopic" Action = [ "kafka-cluster:CreateTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData", "kafka-cluster:DescribeTopic" ] Effect = "Allow" Resource = "*" # arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/__amazon_msk_connect_* }, { Sid = "forMskGroup" Action = [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ] Effect = "Allow" Resource = "*" # "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/__amazon_msk_connect_*", }, # ★注目★ { Sid = "forSecretsManager" Action = [ "secretsmanager:GetSecretValue" ] Effect = "Allow" Resource = "*" }, # SecretsManager に KMSキー暗号化してた場合 { Sid = "forKmsOfSecretsManager" Action = [ "kms:Decrypt", "kms:DescribeKey" ] Effect = "Allow" Resource = "*" } ] }) }
vpc.tf
# For Network # VPC resource "aws_vpc" "vpc" { cidr_block = "192.168.0.0/22" } data "aws_availability_zones" "azs" { state = "available" } # Subnet resource "aws_subnet" "subnet_az1" { availability_zone = data.aws_availability_zones.azs.names[0] cidr_block = "192.168.0.0/24" vpc_id = aws_vpc.vpc.id } resource "aws_subnet" "subnet_az2" { availability_zone = data.aws_availability_zones.azs.names[1] cidr_block = "192.168.1.0/24" vpc_id = aws_vpc.vpc.id } resource "aws_subnet" "subnet_az3" { availability_zone = data.aws_availability_zones.azs.names[2] cidr_block = "192.168.2.0/24" vpc_id = aws_vpc.vpc.id } # Security Group resource "aws_security_group" "demo_msk_sg" { name = "demo-msk-sg" vpc_id = aws_vpc.vpc.id description = "Allow All trafic" ingress { description = "TLS from VPC" from_port = 0 to_port = 0 protocol = "-1" cidr_blocks = ["0.0.0.0/0"] ipv6_cidr_blocks = ["::/0"] } egress { from_port = 0 to_port = 0 protocol = "-1" cidr_blocks = ["0.0.0.0/0"] ipv6_cidr_blocks = ["::/0"] } tags = { Name = "demo-msk-sg" } }
kms.tf
resource "aws_kms_key" "demo_kms_key_for_msk" { description = "This KMS key is for MSK" tags = { Name = "demo-kms-key-for-msk" } } resource "aws_kms_alias" "demo_kms_for_msk" { name = "alias/demo/msk/demo_msk" target_key_id = aws_kms_key.demo_kms_key_for_msk.key_id }
secrets_manager.tf
// connector credentials resource "aws_secretsmanager_secret" "demo_msk_connect_secret" { name = "/msk/connect/demo-msk-connect-credentials" kms_key_id = aws_kms_key.demo_kms_key_for_msk.key_id } # ★Secrets Manager に 接続情報を設定 resource "aws_secretsmanager_secret_version" "demo_msk_connect_secret_version" { secret_id = aws_secretsmanager_secret.demo_msk_connect_secret.id secret_string = jsonencode({ db_url = "jdbc:mysql://127.0.0.1:3306/sample", db_user_name = "user", db_password = "password" }) } # MSK ConnectにSecrets Manager へのアクセス許可を付与 resource "aws_secretsmanager_secret_policy" "demo_msk_connect_secret_policy" { secret_arn = aws_secretsmanager_secret.demo_msk_connect_secret.arn policy = <<POLICY { "Version" : "2012-10-17", "Statement" : [ { "Sid": "AWSKafkaResourcePolicy", "Effect" : "Allow", "Principal" : { "Service" : "kafka.amazonaws.com" }, "Action" : "secretsmanager:getSecretValue", "Resource" : "${aws_secretsmanager_secret.demo_msk_connect_secret.arn}" } ] } POLICY }
msk.tf
resource "aws_msk_serverless_cluster" "demo_msk_serverless_cluster" { cluster_name = "demo-msk-serverless-cluster" description = "This is a demo." vpc_config { subnet_ids = [ aws_subnet.subnet_az1.id, aws_subnet.subnet_az2.id, aws_subnet.subnet_az3.id, ] security_group_ids = [aws_security_group.demo_msk_sg.id] } client_authentication { sasl { iam { enabled = true } } } }
mskconnect_custom_plugin.tf
resource "aws_s3_bucket" "demo_s3_bucket" { bucket = "your-s3-bucket-name" } resource "aws_s3_object" "update_msk_custom_plugin_object" { bucket = aws_s3_bucket.demo_s3_bucket.id key = "sample/xxxx/your-plugin.zip" source = "your-plugin.zip" } resource "aws_mskconnect_custom_plugin" "demo_msk_connect_custom_plugin" { name = "demo-msk-connect-custom-plugin" content_type = "ZIP" location { s3 { bucket_arn = aws_s3_bucket.demo_s3_bucket.arn file_key = aws_s3_object.update_msk_custom_plugin_object.key } } }
mskconnect.tf
resource "aws_mskconnect_connector" "demo_mskconnect_connector" { name = "demo-msk-connect" description = "This is a main for this blog" kafkaconnect_version = "2.7.1" capacity { autoscaling { # The number of microcontroller units (MCUs) # Valid values: 1, 2, 4, 8. The default value is 1 mcu_count = 1 # min/maxは、1-10の範囲で指定 # min/maxは、min < max でなければならない(つまり同じ値もダメ) min_worker_count = 1 max_worker_count = 2 scale_in_policy { cpu_utilization_percentage = 20 } scale_out_policy { cpu_utilization_percentage = 80 } } } # Connector Conguration connector_configuration = { "name" = "demo-msk-connect" "connector.class" = "com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector" "tasks.max" = "3" "topics" = "demo-topic" "connection.url" = "$${secretManager:demo-msk-connect-credentials:db_url}", "connection.user" = "$${secretManager:demo-msk-connect-credentials:db_user_name}", "connection.password" = "$${secretManager:demo-msk-connect-credentials:db_user_name}" "config.providers.secretManager.class" = "com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider" "config.providers" = "secretManager" "config.providers.secretManager.param.aws.region" = "us-west-2" # !! Important !! } kafka_cluster { apache_kafka_cluster { bootstrap_servers = aws_msk_cluster.demo_msk_cluster.bootstrap_brokers_tls vpc { security_groups = [aws_security_group.demo_msk_sg.id] subnets = [aws_subnet.subnet_az1.id, subnet_az2.example2.id, subnet_az2.example3.id] } } } kafka_cluster_client_authentication { # "NONE" or "IAM" authentication_type = "NONE" } # https://docs.aws.amazon.com/ja_jp/MSKC/latest/mskc/API_KafkaClusterEncryptionInTransit.html # Valid Values: PLAINTEXT | TLS kafka_cluster_encryption_in_transit { encryption_type = "TLS" } plugin { custom_plugin { arn = aws_mskconnect_custom_plugin.snowflake.arn revision = aws_mskconnect_custom_plugin.snowflake.latest_revision } } service_execution_role_arn = aws_iam_role.demo_msk_connector_service_execution_role.arn }
参考文献
https://www.entechlog.com/blog/kafka/integrating-msk-connect-with-msk/
https://lenses.io/blog/2023/04/how-to-protect-and-rotate-secrets-for-apache-kafka-connectors-aws-secret-manager/
https://awstip.com/aws-msk-connect-the-all-story-part-3-3-connector-configuration-1417f51900ce
関連記事
Terraform ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/05/000224
Terraform ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/12/09/222057
Terraform ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/05/03/000000
Terraform ~ Terraformあれこれ ~
https://dk521123.hatenablog.com/entry/2023/05/15/205352
Terraform ~ AWS IAM ~
https://dk521123.hatenablog.com/entry/2023/04/12/214311
Terraform ~ AWS S3 ~
https://dk521123.hatenablog.com/entry/2023/04/09/104204
Terraform ~ AWS CloudWatch ~
https://dk521123.hatenablog.com/entry/2023/05/17/123335
Terraform ~ AWS KMS ~
https://dk521123.hatenablog.com/entry/2023/05/26/000000
Terraform ~ AWS Secrets Manager ~
https://dk521123.hatenablog.com/entry/2023/04/11/152801
Terraform ~ AWS MSK ~
https://dk521123.hatenablog.com/entry/2023/05/14/122215
Terraform ~ AWS MSK Connect ~
https://dk521123.hatenablog.com/entry/2023/05/25/000000
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516
Kafka Connect ~ Connector 構成プロパティ ~
https://dk521123.hatenablog.com/entry/2023/06/02/011131