introduced two packages, kafka-clients and spring-kafka, version 0.11.0.0 and 1.3.8 respectively. Some posts on the Internet that integrate spring with kafka do not mention how to add user authentication, that is, security.protocol and sasl.mechanism as well as kafka_client_jaas configuration files. I see that the configuration of spring boot is useful on the Internet, but I don"t know how to convert it to xml? After using the configuration method I found on the Internet, whether I put it in consumerProperties or containerProperties, I will report an error. I can"t set it. I don"t know if it"s the version or the setting.
spring boot Code:
consumer:
bootstrap-servers: 192.168.186.130:9092,192.168.186.131:9092,192.168.186.132:9092
group-id: group-1
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
my code:
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="${group.id}"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="max.poll.records" value="10"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
<!-- <property name="security.protocol" value="${security.protocol}"/>
<property name="sasl.mechanism" value="${sasl.mechanism}"/> -->
</bean>
<!-- consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- -->
<bean id="messageListernerConsumerService" class="com.rongdu.zgj.modules.dsc.kafka.KafkaConsumerListener"/>
<!-- -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="${topic}"/>
<property name="messageListener" ref="messageListernerConsumerService"/>
</bean>
<!-- kafkatemplate beanbeantemplatesend -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
<property name="concurrency" value="5"/>
</bean>