Skip to content

GH-7646: Additional Kafka Properties #7672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public class KafkaProperties {
*/
private String clientId;

/**
* Additional properties used to configure the client.
*/
private Map<String, String> properties = new HashMap<String, String>();

public Consumer getConsumer() {
return this.consumer;
}
Expand Down Expand Up @@ -107,6 +112,14 @@ public void setClientId(String clientId) {
this.clientId = clientId;
}

public Map<String, String> getProperties() {
return this.properties;
}

public void setProperties(Map<String, String> properties) {
this.properties = properties;
}

private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.bootstrapServers != null) {
Expand Down Expand Up @@ -135,6 +148,9 @@ private Map<String, Object> buildCommonProperties() {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
if (this.properties != null && this.properties.size() > 0) {
properties.putAll(this.properties);
}
return properties;
}

Expand Down Expand Up @@ -240,6 +256,11 @@ public static class Consumer {
*/
private Class<?> valueDeserializer = StringDeserializer.class;

/**
* Maximum number of records returned in a single call to poll().
*/
private Integer maxPollRecords;

public Ssl getSsl() {
return this.ssl;
}
Expand Down Expand Up @@ -332,6 +353,14 @@ public void setValueDeserializer(Class<?> valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}

public Integer getMaxPollRecords() {
return this.maxPollRecords;
}

public void setMaxPollRecords(Integer maxPollRecords) {
this.maxPollRecords = maxPollRecords;
}

public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.autoCommitInterval != null) {
Expand Down Expand Up @@ -395,6 +424,9 @@ public Map<String, Object> buildProperties() {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
this.valueDeserializer);
}
if (this.maxPollRecords != null) {
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
}
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,16 @@ public void closeContext() {
@Test
public void consumerProperties() {
load("spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.properties.foo=bar",
"spring.kafka.properties.baz=qux",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.ssl.key-password=p1",
"spring.kafka.ssl.keystore-location=classpath:ksLoc",
"spring.kafka.ssl.keystore-password=p2",
"spring.kafka.ssl.truststore-location=classpath:tsLoc",
"spring.kafka.ssl.truststore-password=p3",
"spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.consumer.max-poll-records=42",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.enable-auto-commit=false",
Expand Down Expand Up @@ -109,6 +113,11 @@ public void consumerProperties() {
.isEqualTo(LongDeserializer.class);
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerDeserializer.class);
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG))
.isEqualTo(42);
assertThat(configs.get("foo")).isEqualTo("bar");
assertThat(configs.get("baz")).isEqualTo("qux");
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ content into your application; rather pick only the properties that you need.
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group this consumer belongs to.
spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator.
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
spring.kafka.consumer.max-poll-messages= # Maximum number of records returned in a single call to poll().
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
Expand All @@ -888,6 +889,7 @@ content into your application; rather pick only the properties that you need.
spring.kafka.producer.key-serializer= # Serializer class for keys.
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.properties.*= # Additional properties used to configure the client.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.keystore-location= # Location of the key store file.
spring.kafka.ssl.keystore-password= # Store password for the key store file.
Expand Down
22 changes: 11 additions & 11 deletions spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4615,22 +4615,22 @@ auto configuration supports all HIGH importance properties, some selected MEDIUM
and any that do not have a default value.

Only a subset of the properties supported by Kafka are available via the `KafkaProperties`
class. If you wish to configure the producer or consumer with additional properties, you
can override the producer factory and/or consumer factory bean, adding additional
properties, for example:
class. If you wish to configure the producer or consumer with additional properties that
are not directly supported, use the following:

`spring.kafka.properties.foo.bar=baz`

This sets the common `foo.bar` kafka property to `baz`.

These properties will be shared by both the consumer and producer factory beans.
If you wish to customize these components with different properties, such as to use a
different metrics reader for each, you can override the bean definitions, as follows:

[source,java,indent=0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this sample should be still here, although re-phrased.
There are some cases which aren't covered with the current KafkaProperties , (e.g. CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG), but I definitely may be interested in different values for consumer and for producer respectively.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this code in a file in https://github.com/spring-projects/spring-boot/tree/master/spring-boot-docs/src/main/java/org/springframework/boot and then include it in the docs please? This commit provides an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet - will do 😄

----
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put("some.property", "some.value");
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
}
include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration]
----



[[boot-features-restclient]]
== Calling REST services
If you need to call remote REST services from your application, you can use Spring
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2016-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.kafka;

import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;

/**
* Example custom kafka configuration beans used when the user wants to
* apply different common properties to the producer and consumer.
*
* @author Gary Russell
* @since 1.5
*
*/
public class KafkaSpecialProducerConsumerConfigExample {

// tag::configuration[]
@Configuration
public static class CustomKafkaBeans {

/**
* Customized ProducerFactory bean.
* @param properties the kafka properties.
* @return the bean.
*/
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
MyProducerMetricsReporter.class);
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
}

/**
* Customized ConsumerFactory bean.
* @param properties the kafka properties.
* @return the bean.
*/
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumererProperties = properties.buildConsumerProperties();
consumererProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
MyConsumerMetricsReporter.class);
return new DefaultKafkaConsumerFactory<Object, Object>(consumererProperties);
}

}
// end::configuration[]

public static class MyConsumerMetricsReporter implements MetricsReporter {

@Override
public void configure(Map<String, ?> configs) {
}

@Override
public void init(List<KafkaMetric> metrics) {
}

@Override
public void metricChange(KafkaMetric metric) {
}

@Override
public void metricRemoval(KafkaMetric metric) {
}

@Override
public void close() {
}

}

public static class MyProducerMetricsReporter implements MetricsReporter {

@Override
public void configure(Map<String, ?> configs) {
}

@Override
public void init(List<KafkaMetric> metrics) {
}

@Override
public void metricChange(KafkaMetric metric) {
}

@Override
public void metricRemoval(KafkaMetric metric) {
}

@Override
public void close() {
}

}

}