您现在的位置是:首页 >学无止境 >大数据 Ranger2.1.0 适配 Kafka3.4.0网站首页学无止境

大数据 Ranger2.1.0 适配 Kafka3.4.0

红桃∩ 2024-07-05 00:01:02
简介大数据 Ranger2.1.0 适配 Kafka3.4.0

Ranger2.1.0 适配 Kafka3.4.0

根据官方说明Kafka3.0以上版本将会被替换权限认证方式,包括 类和方法 的变换,所以需要对ranger中继承 kafka 的实现中,修改相应的逻辑

官方说明

Kafka3.0以上版本将会被替换权限认证方式,包括 类和方法 的变换,

Github PR https://github.com/apache/kafka/pull/10450

在这里插入图片描述

POM

apache-rangerpom.xml,该文件中主要涉及 kafka 的版本及 scala 的版本

<?xml version="1.0" encoding="UTF-8"?>
<project>
...
    <properties>
        ...
        <kafka.version>3.4.0</kafka.version>
        ...
        <scala.version>2.12.10</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        ...
    </properties>

</project>

apache-rangerplugin-schema-registrypom.xml,该文件中主要涉及 kafka 的版本及 scala 的版本

<?xml version="1.0" encoding="UTF-8"?>
<project>
...
    <properties>
        ...
        <kafka.version>3.4.0</kafka.version>
        <kafkaArtifact>kafka_2.12</kafkaArtifact>
        ...
    </properties>

</project>

代码

apache-rangerplugin-kafkasrcmainjavaorgapache angerauthorizationkafkaauthorizerRangerKafkaAuthorizer.java

这个类中是主要涉及的位置,包括类、方法等,所以相等于对原有逻辑适配于 kafka 的重构,主要修改包括认证方法 authorize,
判断操作及资源类型等。

...

package org.apache.ranger.authorization.kafka.authorizer;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.*;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.*;

import org.apache.commons.lang.StringUtils;
import org.apache.ranger.audit.provider.MiscUtil;
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
import org.apache.ranger.plugin.service.RangerBasePlugin;

import org.apache.ranger.plugin.util.RangerPerfTracer;

public class RangerKafkaAuthorizer implements Authorizer {
	public static final Log logger = LogFactory.getLog(RangerKafkaAuthorizer.class);

	private static final Log PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");
	private final CompletableFuture<Void> initialLoadFuture = new CompletableFuture<>();

	public static final String KEY_TOPIC = "topic";
	public static final String KEY_CLUSTER = "cluster";
	public static final String KEY_CONSUMER_GROUP = "consumergroup";
	public static final String KEY_TRANSACTIONALID = "transactionalid";
	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";

	public static final String ACCESS_TYPE_READ = "consume";
	public static final String ACCESS_TYPE_WRITE = "publish";
	public static final String ACCESS_TYPE_CREATE = "create";
	public static final String ACCESS_TYPE_DELETE = "delete";
	public static final String ACCESS_TYPE_CONFIGURE = "configure";
	public static final String ACCESS_TYPE_DESCRIBE = "describe";
	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";

	private static volatile RangerBasePlugin rangerPlugin = null;
	RangerKafkaAuditHandler auditHandler = null;

	public RangerKafkaAuthorizer() {
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
	 */
	@Override
	public void configure(Map<String, ?> configs) {
		RangerBasePlugin me = rangerPlugin;
		if (me == null) {
			synchronized(RangerKafkaAuthorizer.class) {
				me = rangerPlugin;
				if (me == null) {
					try {
						// Possible to override JAAS configuration which is used by Ranger, otherwise
						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
						// if it's not defined, then it reverts to 'KafkaServer' configuration.
						final Object jaasContext = configs.get("ranger.jaas.context");
						final String listenerName = (jaasContext instanceof String
										&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
										: SecurityProtocol.SASL_PLAINTEXT.name();
						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
						MiscUtil.setUGIFromJAASConfig(context.name());
						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
					} catch (Throwable t) {
						logger.error("Error getting principal.", t);
					}
					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
				}
			}
		}
		logger.info("Calling plugin.init()");
		rangerPlugin.init();
		auditHandler = new RangerKafkaAuditHandler();
		rangerPlugin.setResultProcessor(auditHandler);
	}

	@Override
	public void close() {
		logger.info("close() called on authorizer.");
		try {
			if (rangerPlugin != null) {
				rangerPlugin.cleanup();
			}
		} catch (Throwable t) {
			logger.error("Error closing RangerPlugin.", t);
		}
	}

	@Override
	public List<AuthorizationResult> authorize(AuthorizableRequestContext authorizableRequestContext, List<Action> list) {
		List<AuthorizationResult> authResults = new ArrayList<>();
		AuthorizableRequestContext session = authorizableRequestContext;

		for (Action actionData : list) {
			ResourcePattern resource = actionData.resourcePattern();
			AclOperation operation = actionData.operation();
			if (rangerPlugin == null) {
				MiscUtil.logErrorMessageByInterval(logger,
						"Authorizer is still not initialized");
				return new ArrayList<>();
			}

			RangerPerfTracer perf = null;

			if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
				perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
			}
			String userName = null;
			if (session.principal() != null) {
				userName = session.principal().getName();
			}
			java.util.Set<String> userGroups = MiscUtil
					.getGroupsForRequestUser(userName);
			String ip = session.clientAddress().getHostAddress();

			// skip leading slash
			if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
				ip = ip.substring(1);
			}

			Date eventTime = new Date();
			String accessType = mapToRangerAccessType(operation);
			boolean validationFailed = false;
			String validationStr = "";

			if (accessType == null) {
				if (MiscUtil.logErrorMessageByInterval(logger,
						"Unsupported access type. operation=" + operation)) {
					logger.error("Unsupported access type. session=" + session
							+ ", operation=" + operation + ", resource=" + resource);
				}
				validationFailed = true;
				validationStr += "Unsupported access type. operation=" + operation;
			}
			String action = accessType;

			RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
			rangerRequest.setUser(userName);
			rangerRequest.setUserGroups(userGroups);
			rangerRequest.setClientIPAddress(ip);
			rangerRequest.setAccessTime(eventTime);

			RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
			rangerRequest.setResource(rangerResource);
			rangerRequest.setAccessType(accessType);
			rangerRequest.setAction(action);
			rangerRequest.setRequestData(resource.name());

			if (resource.resourceType().equals(ResourceType.TOPIC)) {
				rangerResource.setValue(KEY_TOPIC, resource.name());
			} else if (resource.resourceType().equals(ResourceType.CLUSTER)) {
				rangerResource.setValue(KEY_CLUSTER, resource.name());
			} else if (resource.resourceType().equals(ResourceType.GROUP)) {
				rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
			} else if (resource.resourceType().equals(ResourceType.TRANSACTIONAL_ID)) {
				rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
			} else if (resource.resourceType().equals(ResourceType.DELEGATION_TOKEN)) {
				rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
			} else {
				logger.error("Unsupported resourceType=" + resource.resourceType());
				validationFailed = true;
			}

			AuthorizationResult authResult = AuthorizationResult.DENIED;
			boolean returnValue = false;
			if (validationFailed) {
				MiscUtil.logErrorMessageByInterval(logger, validationStr
						+ ", request=" + rangerRequest);
			} else {

				try {
					RangerAccessResult result = rangerPlugin
							.isAccessAllowed(rangerRequest);
					if (result == null) {
						logger.error("Ranger Plugin returned null. Returning false");
					} else {
						returnValue = result.getIsAllowed();
						authResult = returnValue ? AuthorizationResult.ALLOWED : authResult;
					}
				} catch (Throwable t) {
					logger.error("Error while calling isAccessAllowed(). request="
							+ rangerRequest, t);
				} finally {
					auditHandler.flushAudit();
				}
			}
			RangerPerfTracer.log(perf);

			if (logger.isDebugEnabled()) {
				logger.debug("rangerRequest=" + rangerRequest + ", return="
							+ returnValue);
			}

			authResults.add(authResult);
		}
		return authResults;
	}

	@Override
	public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list) {
		logger.error("createAcls(AuthorizableRequestContext, List<AclBinding>) is not supported by Ranger for Kafka");
		return new ArrayList<>();
	}

	@Override
	public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list) {
		logger.error("deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>) is not supported by Ranger for Kafka");
		return new ArrayList<>();
	}

	@Override
	public Iterable<AclBinding> acls(AclBindingFilter filter) {
		logger.error("getAcls(AclBindingFilter) is not supported by Ranger for Kafka");
		return new ArrayList<>();
	}

	/**
	 * @param operation
	 * @return
	 */
	private String mapToRangerAccessType(AclOperation operation) {
		if (operation.equals(AclOperation.READ)) {
			return ACCESS_TYPE_READ;
		} else if (operation.equals(AclOperation.WRITE)) {
			return ACCESS_TYPE_WRITE;
		} else if (operation.equals(AclOperation.ALTER)) {
			return ACCESS_TYPE_CONFIGURE;
		} else if (operation.equals(AclOperation.DESCRIBE)) {
			return ACCESS_TYPE_DESCRIBE;
		} else if (operation.equals(AclOperation.CLUSTER_ACTION)) {
			return ACCESS_TYPE_CLUSTER_ACTION;
		} else if (operation.equals(AclOperation.CREATE)) {
			return ACCESS_TYPE_CREATE;
		} else if (operation.equals(AclOperation.DELETE)) {
			return ACCESS_TYPE_DELETE;
		} else if (operation.equals(AclOperation.DESCRIBE_CONFIGS)) {
			return ACCESS_TYPE_DESCRIBE_CONFIGS;
		} else if (operation.equals(AclOperation.ALTER_CONFIGS)) {
			return ACCESS_TYPE_ALTER_CONFIGS;
		} else if (operation.equals(AclOperation.IDEMPOTENT_WRITE)) {
			return ACCESS_TYPE_IDEMPOTENT_WRITE;
		}
		return null;
	}

	@Override
	public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
		// from StandardAuthorizer
		Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
		for (Endpoint endpoint : serverInfo.endpoints()) {
			if (serverInfo.earlyStartListeners().contains(
					endpoint.listenerName().orElseGet(() -> ""))) {
				result.put(endpoint, CompletableFuture.completedFuture(null));
			} else {
				result.put(endpoint, initialLoadFuture);
			}
		}
		// logger.error("start() is not supported by Ranger for Kafka");
		return result;
	}

	@Override
	public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
		return Authorizer.super.authorizeByResourceType(requestContext, op, resourceType);
	}
}

apache-ranger anger-kafka-plugin-shimsrcmainjavaorgapache angerauthorizationkafkaauthorizerRangerKafkaAuthorizer.java

这个类没有什么很特殊的改动,主要是修改一些方法的参数为正确的继承参数,

...

package org.apache.ranger.authorization.kafka.authorizer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;

import org.apache.kafka.common.Endpoint;
import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.kafka.server.authorizer.*;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.*;


//public class RangerKafkaAuthorizer extends Authorizer {
public class RangerKafkaAuthorizer implements Authorizer {
	private static final Logger LOG  = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);

	private static final String   RANGER_PLUGIN_TYPE                      = "kafka";
	private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";

	private Authorizer              rangerKakfaAuthorizerImpl = null;
	private RangerPluginClassLoader rangerPluginClassLoader   = null;

	public RangerKafkaAuthorizer() {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
		}

		this.init();

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
		}
	}
	
	private void init(){
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.init()");
		}

		try {
			
			rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
			
			@SuppressWarnings("unchecked")
			Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);

			activatePluginClassLoader();

			rangerKakfaAuthorizerImpl = cls.newInstance();
		} catch (Exception e) {
			// check what need to be done
			LOG.error("Error Enabling RangerKafkaPlugin", e);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.init()");
		}
	}

	@Override
	public void configure(Map<String, ?> configs) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
		}

		try {
			activatePluginClassLoader();

			rangerKakfaAuthorizerImpl.configure(configs);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
		}
	}

	@Override
	public void close() {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.close()");
		}

		try {
			activatePluginClassLoader();
			
			rangerKakfaAuthorizerImpl.close();
		} catch (Throwable t) {
			LOG.error("Error closing RangerPlugin.", t);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.close()");
		}
		
	}

	@Override
	public List<AuthorizationResult> authorize(AuthorizableRequestContext authorizableRequestContext, List<Action> list) {
		if(LOG.isDebugEnabled()) {
			LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(AuthorizableRequestContext=%s, List<Action>=%s)", authorizableRequestContext, list));
		}

		List<AuthorizationResult> ret = new ArrayList<>();

		try {
			activatePluginClassLoader();

			ret = rangerKakfaAuthorizerImpl.authorize(authorizableRequestContext, list);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
		}
		
		return ret;
	}

	@Override
	public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
		}

		List<? extends CompletionStage<AclCreateResult>> createAcls = new ArrayList<>();
		try {
			activatePluginClassLoader();

			createAcls = rangerKakfaAuthorizerImpl.createAcls(authorizableRequestContext, list);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
		}
		return createAcls;
	}

	@Override
	public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
		}
		List<? extends CompletionStage<AclDeleteResult>> ret = new ArrayList<>();
		try {
			activatePluginClassLoader();

			ret = rangerKakfaAuthorizerImpl.deleteAcls(authorizableRequestContext, list);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
		}
		
		return ret;
	}

	@Override
	public Iterable<AclBinding> acls(AclBindingFilter aclBindingFilter) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
		}

		Iterable<AclBinding> ret = new ArrayList<>();

		try {
			activatePluginClassLoader();

			ret = rangerKakfaAuthorizerImpl.acls(aclBindingFilter);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
		}

		return ret;
	}

	@Override
	public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
		return null;
	}

	@Override
	public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
		return Authorizer.super.authorizeByResourceType(requestContext, op, resourceType);
	}
	
	private void activatePluginClassLoader() {
		if(rangerPluginClassLoader != null) {
			rangerPluginClassLoader.activate();
		}
	}

	private void deactivatePluginClassLoader() {
		if(rangerPluginClassLoader != null) {
			rangerPluginClassLoader.deactivate();
		}
	}
		
}

说明

这里涉及了,两个 RangerKafkaAuthorizer 类

  1. ranger-kafka-plugin-shim 下的该类具体是暴露方法,被 kafka 的插件用于权限校验并返回校验结果

  2. plugin-kafka 下的类,是具体实现具体认证的逻辑

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。