diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index c6ed49f49032..c7e9fd45290c 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -71,6 +71,11 @@ public class KafkaProperties { */ private String clientId; + /** + * Additional properties used to configure the client. + */ + private Map properties = new HashMap(); + public Consumer getConsumer() { return this.consumer; } @@ -107,6 +112,14 @@ public void setClientId(String clientId) { this.clientId = clientId; } + public Map getProperties() { + return this.properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + private Map buildCommonProperties() { Map properties = new HashMap(); if (this.bootstrapServers != null) { @@ -135,6 +148,9 @@ private Map buildCommonProperties() { properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword()); } + if (this.properties != null && this.properties.size() > 0) { + properties.putAll(this.properties); + } return properties; } @@ -240,6 +256,11 @@ public static class Consumer { */ private Class valueDeserializer = StringDeserializer.class; + /** + * Maximum number of records returned in a single call to poll(). + */ + private Integer maxPollRecords; + public Ssl getSsl() { return this.ssl; } @@ -332,6 +353,14 @@ public void setValueDeserializer(Class valueDeserializer) { this.valueDeserializer = valueDeserializer; } + public Integer getMaxPollRecords() { + return this.maxPollRecords; + } + + public void setMaxPollRecords(Integer maxPollRecords) { + this.maxPollRecords = maxPollRecords; + } + public Map buildProperties() { Map properties = new HashMap(); if (this.autoCommitInterval != null) { @@ -395,6 +424,9 @@ public Map buildProperties() { properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueDeserializer); } + if (this.maxPollRecords != null) { + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords); + } return properties; } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index db3a8d8bae8f..63b92b8a5d1c 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -61,12 +61,16 @@ public void closeContext() { @Test public void consumerProperties() { load("spring.kafka.bootstrap-servers=foo:1234", + "spring.kafka.properties.foo=bar", + "spring.kafka.properties.baz=qux", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.ssl.key-password=p1", "spring.kafka.ssl.keystore-location=classpath:ksLoc", "spring.kafka.ssl.keystore-password=p2", "spring.kafka.ssl.truststore-location=classpath:tsLoc", "spring.kafka.ssl.truststore-password=p3", "spring.kafka.consumer.auto-commit-interval=123", + "spring.kafka.consumer.max-poll-records=42", "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.client-id=ccid", // test override common "spring.kafka.consumer.enable-auto-commit=false", @@ -109,6 +113,11 @@ public void consumerProperties() { .isEqualTo(LongDeserializer.class); assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) .isEqualTo(IntegerDeserializer.class); + assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)) + .isEqualTo(42); + assertThat(configs.get("foo")).isEqualTo("bar"); + assertThat(configs.get("baz")).isEqualTo("qux"); + assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); } @Test diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index 43267fbfce7d..745211dcb470 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -873,6 +873,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.consumer.group-id= # Unique string that identifies the consumer group this consumer belongs to. spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator. spring.kafka.consumer.key-deserializer= # Deserializer class for keys. + spring.kafka.consumer.max-poll-messages= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.value-deserializer= # Deserializer class for values. spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation. @@ -888,6 +889,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.producer.key-serializer= # Serializer class for keys. spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends. spring.kafka.producer.value-serializer= # Serializer class for values. + spring.kafka.properties.*= # Additional properties used to configure the client. spring.kafka.ssl.key-password= # Password of the private key in the key store file. spring.kafka.ssl.keystore-location= # Location of the key store file. spring.kafka.ssl.keystore-password= # Store password for the key store file. diff --git a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index d7098af05bb7..46f53ac5933f 100644 --- a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -4615,22 +4615,22 @@ auto configuration supports all HIGH importance properties, some selected MEDIUM and any that do not have a default value. Only a subset of the properties supported by Kafka are available via the `KafkaProperties` -class. If you wish to configure the producer or consumer with additional properties, you -can override the producer factory and/or consumer factory bean, adding additional -properties, for example: +class. If you wish to configure the producer or consumer with additional properties that +are not directly supported, use the following: + +`spring.kafka.properties.foo.bar=baz` + +This sets the common `foo.bar` kafka property to `baz`. + +These properties will be shared by both the consumer and producer factory beans. +If you wish to customize these components with different properties, such as to use a +different metrics reader for each, you can override the bean definitions, as follows: [source,java,indent=0] ---- - @Bean - public ProducerFactory kafkaProducerFactory(KafkaProperties properties) { - Map producerProperties = properties.buildProducerProperties(); - producerProperties.put("some.property", "some.value"); - return new DefaultKafkaProducerFactory(producerProperties); - } +include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration] ---- - - [[boot-features-restclient]] == Calling REST services If you need to call remote REST services from your application, you can use Spring diff --git a/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java b/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java new file mode 100644 index 000000000000..43dfc60ce643 --- /dev/null +++ b/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java @@ -0,0 +1,125 @@ +/* + * Copyright 2016-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.kafka; + +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; + +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.ProducerFactory; + +/** + * Example custom kafka configuration beans used when the user wants to + * apply different common properties to the producer and consumer. + * + * @author Gary Russell + * @since 1.5 + * + */ +public class KafkaSpecialProducerConsumerConfigExample { + + // tag::configuration[] + @Configuration + public static class CustomKafkaBeans { + + /** + * Customized ProducerFactory bean. + * @param properties the kafka properties. + * @return the bean. + */ + @Bean + public ProducerFactory kafkaProducerFactory(KafkaProperties properties) { + Map producerProperties = properties.buildProducerProperties(); + producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + MyProducerMetricsReporter.class); + return new DefaultKafkaProducerFactory(producerProperties); + } + + /** + * Customized ConsumerFactory bean. + * @param properties the kafka properties. + * @return the bean. + */ + @Bean + public ConsumerFactory kafkaConsumerFactory(KafkaProperties properties) { + Map consumererProperties = properties.buildConsumerProperties(); + consumererProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + MyConsumerMetricsReporter.class); + return new DefaultKafkaConsumerFactory(consumererProperties); + } + + } + // end::configuration[] + + public static class MyConsumerMetricsReporter implements MetricsReporter { + + @Override + public void configure(Map configs) { + } + + @Override + public void init(List metrics) { + } + + @Override + public void metricChange(KafkaMetric metric) { + } + + @Override + public void metricRemoval(KafkaMetric metric) { + } + + @Override + public void close() { + } + + } + + public static class MyProducerMetricsReporter implements MetricsReporter { + + @Override + public void configure(Map configs) { + } + + @Override + public void init(List metrics) { + } + + @Override + public void metricChange(KafkaMetric metric) { + } + + @Override + public void metricRemoval(KafkaMetric metric) { + } + + @Override + public void close() { + } + + } + +}