Skip to content

Commit 0454039

Browse files
vvcepheiJohn Roesler
authored and
John Roesler
committed
KAFKA-12268: Implement task idling semantics via currentLag API (apache#10137)
Implements KIP-695 Reverts a previous behavior change to Consumer.poll and replaces it with a new Consumer.currentLag API, which returns the client's currently cached lag. Uses this new API to implement the desired task idling semantics improvement from KIP-695. Reverts fdcf8fb / KAFKA-10866: Add metadata to ConsumerRecords (apache#9836) Reviewers: Chia-Ping Tsai <[email protected]>, Guozhang Wang <[email protected]>
1 parent fc8c9e1 commit 0454039

File tree

23 files changed

+203
-571
lines changed

23 files changed

+203
-571
lines changed

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1143,7 +1143,6 @@ project(':clients') {
11431143
testCompile libs.bcpkix
11441144
testCompile libs.junitJupiter
11451145
testCompile libs.mockitoCore
1146-
testCompile libs.hamcrest
11471146

11481147
testRuntime libs.slf4jlog4j
11491148
testRuntime libs.jacksonDatabind

checkstyle/suppressions.xml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,7 @@
100100
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
101101

102102
<suppress checks="NPathComplexity"
103-
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark|MockConsumer"/>
104-
105-
<suppress checks="CyclomaticComplexity"
106-
files="MockConsumer"/>
103+
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
107104

108105
<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
109106
files="Murmur3Test.java"/>

clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Collection;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.OptionalLong;
2930
import java.util.Set;
3031
import java.util.concurrent.TimeUnit;
3132
import java.util.regex.Pattern;
@@ -243,6 +244,11 @@ public interface Consumer<K, V> extends Closeable {
243244
*/
244245
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
245246

247+
/**
248+
* @see KafkaConsumer#currentLag(TopicPartition)
249+
*/
250+
OptionalLong currentLag(TopicPartition topicPartition);
251+
246252
/**
247253
* @see KafkaConsumer#groupMetadata()
248254
*/

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java

Lines changed: 2 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616
*/
1717
package org.apache.kafka.clients.consumer;
1818

19-
import org.apache.kafka.clients.consumer.internals.FetchedRecords;
2019
import org.apache.kafka.common.TopicPartition;
2120
import org.apache.kafka.common.utils.AbstractIterator;
2221

2322
import java.util.ArrayList;
2423
import java.util.Collections;
25-
import java.util.HashMap;
2624
import java.util.Iterator;
2725
import java.util.List;
2826
import java.util.Map;
@@ -34,99 +32,12 @@
3432
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
3533
*/
3634
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
37-
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(
38-
Collections.emptyMap(),
39-
Collections.emptyMap()
40-
);
35+
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.emptyMap());
4136

4237
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
43-
private final Map<TopicPartition, Metadata> metadata;
4438

45-
public static final class Metadata {
46-
47-
private final long receivedTimestamp;
48-
private final Long position;
49-
private final Long endOffset;
50-
51-
public Metadata(final long receivedTimestamp,
52-
final Long position,
53-
final Long endOffset) {
54-
this.receivedTimestamp = receivedTimestamp;
55-
this.position = position;
56-
this.endOffset = endOffset;
57-
}
58-
59-
/**
60-
* @return The timestamp of the broker response that contained this metadata
61-
*/
62-
public long receivedTimestamp() {
63-
return receivedTimestamp;
64-
}
65-
66-
/**
67-
* @return The next position the consumer will fetch, or null if the consumer has no position.
68-
*/
69-
public Long position() {
70-
return position;
71-
}
72-
73-
/**
74-
* @return The lag between the next position to fetch and the current end of the partition, or
75-
* null if the end offset is not known or there is no position.
76-
*/
77-
public Long lag() {
78-
return endOffset == null || position == null ? null : endOffset - position;
79-
}
80-
81-
/**
82-
* @return The current last offset in the partition. The determination of the "last" offset
83-
* depends on the Consumer's isolation level. Under "read_uncommitted," this is the last successfully
84-
* replicated offset plus one. Under "read_committed," this is the minimum of the last successfully
85-
* replicated offset plus one or the smallest offset of any open transaction. Null if the end offset
86-
* is not known.
87-
*/
88-
public Long endOffset() {
89-
return endOffset;
90-
}
91-
92-
@Override
93-
public String toString() {
94-
return "Metadata{" +
95-
"receivedTimestamp=" + receivedTimestamp +
96-
", position=" + position +
97-
", endOffset=" + endOffset +
98-
'}';
99-
}
100-
}
101-
102-
private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final FetchedRecords<K, V> fetchedRecords) {
103-
final Map<TopicPartition, Metadata> metadata = new HashMap<>();
104-
for (final Map.Entry<TopicPartition, FetchedRecords.FetchMetadata> entry : fetchedRecords.metadata().entrySet()) {
105-
metadata.put(
106-
entry.getKey(),
107-
new Metadata(
108-
entry.getValue().receivedTimestamp(),
109-
entry.getValue().position() == null ? null : entry.getValue().position().offset,
110-
entry.getValue().endOffset()
111-
)
112-
);
113-
}
114-
return metadata;
115-
}
116-
117-
public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
118-
this.records = records;
119-
this.metadata = new HashMap<>();
120-
}
121-
122-
public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
123-
final Map<TopicPartition, Metadata> metadata) {
39+
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
12440
this.records = records;
125-
this.metadata = metadata;
126-
}
127-
128-
ConsumerRecords(final FetchedRecords<K, V> fetchedRecords) {
129-
this(fetchedRecords.records(), extractMetadata(fetchedRecords));
13041
}
13142

13243
/**
@@ -142,16 +53,6 @@ public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
14253
return Collections.unmodifiableList(recs);
14354
}
14455

145-
/**
146-
* Get the updated metadata returned by the brokers along with this record set.
147-
* May be empty or partial depending on the responses from the broker during this particular poll.
148-
* May also include metadata for additional partitions than the ones for which there are records
149-
* in this {@code ConsumerRecords} object.
150-
*/
151-
public Map<TopicPartition, Metadata> metadata() {
152-
return Collections.unmodifiableMap(metadata);
153-
}
154-
15556
/**
15657
* Get just the records for the given topic
15758
*/

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
2828
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
2929
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
30-
import org.apache.kafka.clients.consumer.internals.FetchedRecords;
3130
import org.apache.kafka.clients.consumer.internals.Fetcher;
3231
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
3332
import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
@@ -74,6 +73,7 @@
7473
import java.util.Map;
7574
import java.util.Objects;
7675
import java.util.Optional;
76+
import java.util.OptionalLong;
7777
import java.util.Properties;
7878
import java.util.Set;
7979
import java.util.concurrent.TimeUnit;
@@ -578,6 +578,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
578578
private final Deserializer<V> valueDeserializer;
579579
private final Fetcher<K, V> fetcher;
580580
private final ConsumerInterceptors<K, V> interceptors;
581+
private final IsolationLevel isolationLevel;
581582

582583
private final Time time;
583584
private final ConsumerNetworkClient client;
@@ -736,7 +737,7 @@ public KafkaConsumer(Map<String, Object> configs,
736737

737738
FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);
738739
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
739-
IsolationLevel isolationLevel = IsolationLevel.valueOf(
740+
this.isolationLevel = IsolationLevel.valueOf(
740741
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
741742
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
742743
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -849,6 +850,7 @@ public KafkaConsumer(Map<String, Object> configs,
849850
this.keyDeserializer = keyDeserializer;
850851
this.valueDeserializer = valueDeserializer;
851852
this.fetcher = fetcher;
853+
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
852854
this.interceptors = Objects.requireNonNull(interceptors);
853855
this.time = time;
854856
this.client = client;
@@ -1235,7 +1237,7 @@ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetad
12351237
}
12361238
}
12371239

1238-
final FetchedRecords<K, V> records = pollForFetches(timer);
1240+
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
12391241
if (!records.isEmpty()) {
12401242
// before returning the fetched records, we can send off the next round of fetches
12411243
// and avoid block waiting for their responses to enable pipelining while the user
@@ -1269,12 +1271,12 @@ boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitFo
12691271
/**
12701272
* @throws KafkaException if the rebalance callback throws exception
12711273
*/
1272-
private FetchedRecords<K, V> pollForFetches(Timer timer) {
1274+
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
12731275
long pollTimeout = coordinator == null ? timer.remainingMs() :
12741276
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
12751277

12761278
// if data is available already, return it immediately
1277-
final FetchedRecords<K, V> records = fetcher.fetchedRecords();
1279+
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
12781280
if (!records.isEmpty()) {
12791281
return records;
12801282
}
@@ -2219,6 +2221,30 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
22192221
}
22202222
}
22212223

2224+
/**
2225+
* Get the consumer's current lag on the partition. Returns an "empty" {@link OptionalLong} if the lag is not known,
2226+
* for example if there is no position yet, or if the end offset is not known yet.
2227+
*
2228+
* <p>
2229+
* This method uses locally cached metadata and never makes a remote call.
2230+
*
2231+
* @param topicPartition The partition to get the lag for.
2232+
*
2233+
* @return This {@code Consumer} instance's current lag for the given partition.
2234+
*
2235+
* @throws IllegalStateException if the {@code topicPartition} is not assigned
2236+
**/
2237+
@Override
2238+
public OptionalLong currentLag(TopicPartition topicPartition) {
2239+
acquireAndEnsureOpen();
2240+
try {
2241+
final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
2242+
return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
2243+
} finally {
2244+
release();
2245+
}
2246+
}
2247+
22222248
/**
22232249
* Return the current group metadata associated with this consumer.
22242250
*

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,16 @@
3737
import java.util.List;
3838
import java.util.Map;
3939
import java.util.Optional;
40+
import java.util.OptionalLong;
4041
import java.util.Queue;
4142
import java.util.Set;
4243
import java.util.concurrent.TimeUnit;
4344
import java.util.concurrent.atomic.AtomicBoolean;
4445
import java.util.regex.Pattern;
4546
import java.util.stream.Collectors;
4647

48+
import static java.util.Collections.singleton;
49+
4750

4851
/**
4952
* A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
@@ -218,21 +221,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
218221
}
219222

220223
toClear.forEach(p -> this.records.remove(p));
221-
222-
final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new HashMap<>();
223-
for (final TopicPartition partition : subscriptions.assignedPartitions()) {
224-
if (subscriptions.hasValidPosition(partition) && endOffsets.containsKey(partition)) {
225-
final SubscriptionState.FetchPosition position = subscriptions.position(partition);
226-
final long offset = position.offset;
227-
final long endOffset = endOffsets.get(partition);
228-
metadata.put(
229-
partition,
230-
new ConsumerRecords.Metadata(System.currentTimeMillis(), offset, endOffset)
231-
);
232-
}
233-
}
234-
235-
return new ConsumerRecords<>(results, metadata);
224+
return new ConsumerRecords<>(results);
236225
}
237226

238227
public synchronized void addRecord(ConsumerRecord<K, V> record) {
@@ -243,7 +232,6 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
243232
throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
244233
List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>());
245234
recs.add(record);
246-
endOffsets.compute(tp, (ignore, offset) -> offset == null ? record.offset() : Math.max(offset, record.offset()));
247235
}
248236

249237
/**
@@ -318,7 +306,7 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
318306
@Deprecated
319307
@Override
320308
public synchronized OffsetAndMetadata committed(final TopicPartition partition) {
321-
return committed(Collections.singleton(partition)).get(partition);
309+
return committed(singleton(partition)).get(partition);
322310
}
323311

324312
@Deprecated
@@ -556,6 +544,16 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
556544
return endOffsets(partitions);
557545
}
558546

547+
@Override
548+
public OptionalLong currentLag(TopicPartition topicPartition) {
549+
if (endOffsets.containsKey(topicPartition)) {
550+
return OptionalLong.of(endOffsets.get(topicPartition) - position(topicPartition));
551+
} else {
552+
// if the test doesn't bother to set an end offset, we assume it wants to model being caught up.
553+
return OptionalLong.of(0L);
554+
}
555+
}
556+
559557
@Override
560558
public ConsumerGroupMetadata groupMetadata() {
561559
return new ConsumerGroupMetadata("dummy.group.id", 1, "1", Optional.empty());

0 commit comments

Comments
 (0)