IAM认证

MSK可以集成AWS IAM认证,使用role/policy来控制访问权限,相比于ACL或SASL/SCRAM密码控制,这种方式使用起来更加简便,强推这种方式。

Create a MSK cluster using AWS_MSK_IAM

image-20220628220159763

image-20220628220221659

Connect using an admin user

This should give me control over all the resources within my cluster-level permissions: cluster, topic, group, and transactional ID.

You need to make sure to download access keys and secret access keys for this IAM user and set them as a new profile on your computer:

$ aws configure --profile stephane-msk
AWS Access Key ID [None]: AKIAVUITFK3*******
AWS Secret Access Key [None]: QFO60Ce5YHdav********
Default region name [None]: us-east-1
Default output format [None]: json

I can retrieve the bootstrap servers via the AWS Management Console or the AWS Command Line Interface (AWS CLI). .

Next, I need to specify the connectivity settings to Apache Kafka, which for Java clients is the following:

# ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName="stephane-msk";
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler 

The ssl.truststore.location is unnecessary if your Java JDK distribution already trusts the TLS certificate of the MSK broker, which should be the case of all standard JDK distributions, because the TLS certificate is public.

I’ve included awsProfileName="stephane-msk" to make sure I point to the credentials of my newly configured profile. For more information, see the README of the Amazon MSK IAM client auth libraries.

Conduktor is now connected to the MSK cluster, and we have full access to the cluster thanks to the managed IAM policy. Note how I didn’t have to “apply” the IAM policy. As soon as it was created and attached to my user, the MSK cluster picked it up. Let’s make sure by creating a topic, producing some data to it, and consuming data from it.

img

Create a Apache Kafka topic from an EC2 instance

Let’s create an IAM role, for now without any IAM policies. We’ll call it DemoMSKClient.

Create an Amazon Elastic Compute Cloud (Amazon EC2) instance using Amazon Linux 2 and attach the DemoMSKClient role.

Using EC2 Instance Connect, let’s ensure the EC2 instance has network connectivity to your MSK cluster:

$ sudo yum install -y nc
[ec2-user@ip-172-31-58-110 ~]$ nc -vz b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com 9098
Ncat: Version 7.50 ( https://nmap.org/ncat )
Ncat: Connected to 172.31.33.225:9098.
Ncat: 0 bytes sent, 0 bytes received in 0.03 seconds.

Next, install the Apache Kafka clients:

#!/bin/bash
sudo su
yum update -y
yum install -y java-11-amazon-corretto-headless
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar -xzf kafka_2.13-2.5.0.tgz
mv kafka_2.13-2.5.0 /usr/local/
for i in /usr/local/kafka_2.13-2.5.0/bin/*.sh; do
   mv $i ${i%???}
done;
ln -sfn /usr/local/kafka_2.13-2.5.0 /usr/local/kafka
cp /usr/local/kafka_2.13-2.5.0/bin/kafka-run-class /usr/local/kafka_2.13-2.5.0/bin/kafka-run-class.sh
exit
echo 'export PATH=/usr/local/kafka/bin:$PATH' >> /home/ec2-user/.bashrc

Now, let’s download the JAR file to give our clients the extra classes they need to work with Amazon MSK IAM security (make sure to change the version in the following code, in case you’re not downloading v1.0.0).

Create a client configuration file named client-config.properties:

# ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

The truststore isn’t needed because Amazon Linux 2 with Amazon Corretto 11 trusts the certificate by default.

Additionally, our EC2 instance is configured with a role, so the IAMLoginModule automatically uses that role.

Now let’s create an Apache Kafka topic:

export CLASSPATH=/home/ec2-user/aws-msk-iam-auth-1.0.0-all.jar
kafka-topics --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --create --replication-factor 3 --partitions 3 --topic my-topic --command-config client-config.properties

We can export the CLASSPATH to run the Kafka CLI and make sure the Amazon MSK IAM libraries are included, or we can copy the JAR file directly in the Kafka libs directory.

We get an authentication error, which is by design:

Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [174b92a5-b2d6-43d9-beef-af4b8a2dd1f6]: Access denied (kafka.admin.TopicCommand$)

Let’s fix this by adding the kafka-cluster:Connect statement for our IAM role:

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Effect": "Allow",
           "Action": [
               "kafka-cluster:Connect"
           ],
           "Resource": [
               "arn:aws:kafka:us-east-1:123456123456:cluster/stephane-cluster/*"
           ]
       }
   ]
}

Let’s try running the command again:

kafka-topics --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --create --replication-factor 3 --partitions 3 --topic my-topic --command-config client-config.properties

Now we’re connected to the cluster, but we get an authorization error. This is by design because IAM access control assumes least privileges and none exist that allow me to create a topic. Let’s fix it by adding the minimum privilege we need to create that topic:

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Effect": "Allow",
           "Action": [
               "kafka-cluster:Connect",
               "kafka-cluster:CreateTopic"
           ],
           "Resource": [
               "arn:aws:kafka:us-east-1:123456123456:cluster/stephane-cluster/*",
            "arn:aws:kafka:us-east-1:123456123456:topic/stephane-cluster/*/my-topic"
           ]
       }
   ]
}

Let’s try creating the topic again:

$ kafka-topics --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --create --replication-factor 3 --partitions 3 --topic my-topic --command-config client-config.properties
Created topic my-topic.

Perfect!

Note that if I try to list the topics, I don’t see my topic, because I didn’t add my permissions to my IAM policy. Fine-grained access control at its finest!

$ kafka-topics --bootstrap-server   b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --list --command-config client-config.properties

Produce from an EC2 instance

Let’s try to produce to the topic we created previously:

$ kafka-console-producer --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --producer.config client-config.properties --topic my-topic
> hello
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [my-topic]

Now we’re connected to the MSK cluster, but we get an authorization error. We can fix it by adding the minimum privilege we need to write to that topic:

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Effect": "Allow",
           "Action": [
               "kafka-cluster:Connect",
               "kafka-cluster:DescribeTopic",
               "kafka-cluster:WriteData"
           ],
           "Resource": [
               "arn:aws:kafka:us-east-1:123456123456:cluster/stephane-cluster/*",
              "arn:aws:kafka:us-east-1:123456123456:topic/stephane-cluster/*/my-topic"
           ]
       }
   ]
}

Let’s try producing again:

kafka-console-producer --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --producer.config client-config.properties --topic my-topic

Consume from an EC2 instance

We use the same EC2 instance for simplicity’s sake, but you could create a separate EC2 instance with a separate role as an exercise.

Let’s try consuming from the topic, using the group my-group:

kafka-console-consumer --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --topic my-topic --from-beginning --consumer.config client-config.properties --group my-group

We get an authorization exception. We can fix this by adding an IAM policy:

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Effect": "Allow",
           "Action": [
               "kafka-cluster:Connect",
               "kafka-cluster:DescribeTopic",

               "kafka-cluster:ReadData",
               "kafka-cluster:DescribeGroup",
               "kafka-cluster:AlterGroup"
           ],
           "Resource": [
               "arn:aws:kafka:us-east-1:123456123456:cluster/stephane-cluster/*",
               "arn:aws:kafka:us-east-1:123456123456:topic/stephane-cluster/*/my-topic",
               "arn:aws:kafka:us-east-1:123456123456:group/stephane-cluster/*/my-group"
           ]
       }
   ]
}

Let’s try the command again, which should work:

kafka-console-consumer --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --topic my-topic --from-beginning --consumer.config client-config.properties --group my-group

Java-based code

If you’re using the Java clients (such as Java Producer, Consumer, Kafka Streams, or Kafka Connect), you’re in luck—all you need to do is to reference the Java library using Maven:

<dependency>
    <groupId>software.amazon.msk</groupId>
    <artifactId>aws-msk-iam-auth</artifactId>
    <version>1.0.0</version>		
</dependency>

Then you specify the necessary Kafka properties:

ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler		

Takeaways

  • 当开启了IAM认证时,如果用户没有访问MSK的对应权限,则访问不了集群