Skip to content

Commit bdda470

Browse files
garyrussellphilwebb
authored andcommitted
Support arbitrary Kafka properties
Add support for arbitrary Kafka properties via `spring.kafka.properties.*` and also a `spring.kafka.max.poll.records` property. See gh-7672
1 parent ef671e7 commit bdda470

File tree

5 files changed

+179
-11
lines changed

5 files changed

+179
-11
lines changed

spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public class KafkaProperties {
7171
*/
7272
private String clientId;
7373

74+
/**
75+
* Additional properties used to configure the client.
76+
*/
77+
private Map<String, String> properties = new HashMap<String, String>();
78+
7479
public Consumer getConsumer() {
7580
return this.consumer;
7681
}
@@ -107,6 +112,14 @@ public void setClientId(String clientId) {
107112
this.clientId = clientId;
108113
}
109114

115+
public Map<String, String> getProperties() {
116+
return this.properties;
117+
}
118+
119+
public void setProperties(Map<String, String> properties) {
120+
this.properties = properties;
121+
}
122+
110123
private Map<String, Object> buildCommonProperties() {
111124
Map<String, Object> properties = new HashMap<String, Object>();
112125
if (this.bootstrapServers != null) {
@@ -135,6 +148,9 @@ private Map<String, Object> buildCommonProperties() {
135148
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
136149
this.ssl.getTruststorePassword());
137150
}
151+
if (this.properties != null && this.properties.size() > 0) {
152+
properties.putAll(this.properties);
153+
}
138154
return properties;
139155
}
140156

@@ -240,6 +256,11 @@ public static class Consumer {
240256
*/
241257
private Class<?> valueDeserializer = StringDeserializer.class;
242258

259+
/**
260+
* Maximum number of records returned in a single call to poll().
261+
*/
262+
private Integer maxPollRecords;
263+
243264
public Ssl getSsl() {
244265
return this.ssl;
245266
}
@@ -332,6 +353,14 @@ public void setValueDeserializer(Class<?> valueDeserializer) {
332353
this.valueDeserializer = valueDeserializer;
333354
}
334355

356+
public Integer getMaxPollRecords() {
357+
return this.maxPollRecords;
358+
}
359+
360+
public void setMaxPollRecords(Integer maxPollRecords) {
361+
this.maxPollRecords = maxPollRecords;
362+
}
363+
335364
public Map<String, Object> buildProperties() {
336365
Map<String, Object> properties = new HashMap<String, Object>();
337366
if (this.autoCommitInterval != null) {
@@ -395,6 +424,9 @@ public Map<String, Object> buildProperties() {
395424
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
396425
this.valueDeserializer);
397426
}
427+
if (this.maxPollRecords != null) {
428+
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
429+
}
398430
return properties;
399431
}
400432

spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,16 @@ public void closeContext() {
6161
@Test
6262
public void consumerProperties() {
6363
load("spring.kafka.bootstrap-servers=foo:1234",
64+
"spring.kafka.properties.foo=bar",
65+
"spring.kafka.properties.baz=qux",
66+
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
6467
"spring.kafka.ssl.key-password=p1",
6568
"spring.kafka.ssl.keystore-location=classpath:ksLoc",
6669
"spring.kafka.ssl.keystore-password=p2",
6770
"spring.kafka.ssl.truststore-location=classpath:tsLoc",
6871
"spring.kafka.ssl.truststore-password=p3",
6972
"spring.kafka.consumer.auto-commit-interval=123",
73+
"spring.kafka.consumer.max-poll-records=42",
7074
"spring.kafka.consumer.auto-offset-reset=earliest",
7175
"spring.kafka.consumer.client-id=ccid", // test override common
7276
"spring.kafka.consumer.enable-auto-commit=false",
@@ -109,6 +113,11 @@ public void consumerProperties() {
109113
.isEqualTo(LongDeserializer.class);
110114
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
111115
.isEqualTo(IntegerDeserializer.class);
116+
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG))
117+
.isEqualTo(42);
118+
assertThat(configs.get("foo")).isEqualTo("bar");
119+
assertThat(configs.get("baz")).isEqualTo("qux");
120+
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
112121
}
113122

114123
@Test

spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -878,6 +878,7 @@ content into your application; rather pick only the properties that you need.
878878
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group this consumer belongs to.
879879
spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator.
880880
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
881+
spring.kafka.consumer.max-poll-messages= # Maximum number of records returned in a single call to poll().
881882
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
882883
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
883884
spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
@@ -893,6 +894,7 @@ content into your application; rather pick only the properties that you need.
893894
spring.kafka.producer.key-serializer= # Serializer class for keys.
894895
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
895896
spring.kafka.producer.value-serializer= # Serializer class for values.
897+
spring.kafka.properties.*= # Additional properties used to configure the client.
896898
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
897899
spring.kafka.ssl.keystore-location= # Location of the key store file.
898900
spring.kafka.ssl.keystore-password= # Store password for the key store file.

spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4643,22 +4643,22 @@ auto configuration supports all HIGH importance properties, some selected MEDIUM
46434643
and any that do not have a default value.
46444644

46454645
Only a subset of the properties supported by Kafka are available via the `KafkaProperties`
4646-
class. If you wish to configure the producer or consumer with additional properties, you
4647-
can override the producer factory and/or consumer factory bean, adding additional
4648-
properties, for example:
4646+
class. If you wish to configure the producer or consumer with additional properties that
4647+
are not directly supported, use the following:
4648+
4649+
`spring.kafka.properties.foo.bar=baz`
4650+
4651+
This sets the common `foo.bar` kafka property to `baz`.
4652+
4653+
These properties will be shared by both the consumer and producer factory beans.
4654+
If you wish to customize these components with different properties, such as to use a
4655+
different metrics reader for each, you can override the bean definitions, as follows:
46494656

46504657
[source,java,indent=0]
46514658
----
4652-
@Bean
4653-
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
4654-
Map<String, Object> producerProperties = properties.buildProducerProperties();
4655-
producerProperties.put("some.property", "some.value");
4656-
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
4657-
}
4659+
include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration]
46584660
----
46594661

4660-
4661-
46624662
[[boot-features-restclient]]
46634663
== Calling REST services
46644664
If you need to call remote REST services from your application, you can use Spring
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2016-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.kafka;
18+
19+
import java.util.List;
20+
import java.util.Map;
21+
22+
import org.apache.kafka.clients.CommonClientConfigs;
23+
import org.apache.kafka.common.metrics.KafkaMetric;
24+
import org.apache.kafka.common.metrics.MetricsReporter;
25+
26+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.kafka.core.ConsumerFactory;
30+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
31+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
32+
import org.springframework.kafka.core.ProducerFactory;
33+
34+
/**
35+
* Example custom kafka configuration beans used when the user wants to
36+
* apply different common properties to the producer and consumer.
37+
*
38+
* @author Gary Russell
39+
* @since 1.5
40+
*
41+
*/
42+
public class KafkaSpecialProducerConsumerConfigExample {
43+
44+
// tag::configuration[]
45+
@Configuration
46+
public static class CustomKafkaBeans {
47+
48+
/**
49+
* Customized ProducerFactory bean.
50+
* @param properties the kafka properties.
51+
* @return the bean.
52+
*/
53+
@Bean
54+
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
55+
Map<String, Object> producerProperties = properties.buildProducerProperties();
56+
producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
57+
MyProducerMetricsReporter.class);
58+
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
59+
}
60+
61+
/**
62+
* Customized ConsumerFactory bean.
63+
* @param properties the kafka properties.
64+
* @return the bean.
65+
*/
66+
@Bean
67+
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
68+
Map<String, Object> consumererProperties = properties.buildConsumerProperties();
69+
consumererProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
70+
MyConsumerMetricsReporter.class);
71+
return new DefaultKafkaConsumerFactory<Object, Object>(consumererProperties);
72+
}
73+
74+
}
75+
// end::configuration[]
76+
77+
public static class MyConsumerMetricsReporter implements MetricsReporter {
78+
79+
@Override
80+
public void configure(Map<String, ?> configs) {
81+
}
82+
83+
@Override
84+
public void init(List<KafkaMetric> metrics) {
85+
}
86+
87+
@Override
88+
public void metricChange(KafkaMetric metric) {
89+
}
90+
91+
@Override
92+
public void metricRemoval(KafkaMetric metric) {
93+
}
94+
95+
@Override
96+
public void close() {
97+
}
98+
99+
}
100+
101+
public static class MyProducerMetricsReporter implements MetricsReporter {
102+
103+
@Override
104+
public void configure(Map<String, ?> configs) {
105+
}
106+
107+
@Override
108+
public void init(List<KafkaMetric> metrics) {
109+
}
110+
111+
@Override
112+
public void metricChange(KafkaMetric metric) {
113+
}
114+
115+
@Override
116+
public void metricRemoval(KafkaMetric metric) {
117+
}
118+
119+
@Override
120+
public void close() {
121+
}
122+
123+
}
124+
125+
}

0 commit comments

Comments
 (0)