Skip to content

Commit acd3858

Browse files
joan38guozhangwang
authored andcommitted
KAFKA-7396: Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes (#5551)
We want to make sure that we always have a serde for all Materialized, Serialized, Joined, Consumed and Produced. For that we can make use of the implicit parameters in Scala. KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-365%3A+Materialized%2C+Serialized%2C+Joined%2C+Consumed+and+Produced+with+implicit+Serde Reviewers: John Roesler <[email protected]>, Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>, Ted Yu <[email protected]>
1 parent e2ec2d7 commit acd3858

23 files changed

+676
-36
lines changed

streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
7171
* @param valueSerde Serde to use for serializing the value
7272
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
7373
* if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
74-
* {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} wil be used
74+
* {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner}
75+
* will be used
7576
* @param <K> key type
7677
* @param <V> value type
7778
* @return A new {@link Produced} instance configured with keySerde, valueSerde, and partitioner

streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.kafka.streams.processor.StreamPartitioner;
2222

2323
public class ProducedInternal<K, V> extends Produced<K, V> {
24-
ProducedInternal(final Produced<K, V> produced) {
24+
public ProducedInternal(final Produced<K, V> produced) {
2525
super(produced);
2626
}
2727

streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,16 @@
1919
import org.apache.kafka.common.serialization.Serde;
2020
import org.apache.kafka.streams.kstream.Serialized;
2121

22-
class SerializedInternal<K, V> extends Serialized<K, V> {
23-
SerializedInternal(final Serialized<K, V> serialized) {
22+
public class SerializedInternal<K, V> extends Serialized<K, V> {
23+
public SerializedInternal(final Serialized<K, V> serialized) {
2424
super(serialized);
2525
}
2626

27-
Serde<K> keySerde() {
27+
public Serde<K> keySerde() {
2828
return keySerde;
2929
}
3030

31-
Serde<V> valueSerde() {
31+
public Serde<V> valueSerde() {
3232
return valueSerde;
3333
}
34-
3534
}

streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,13 @@ import org.apache.kafka.streams.kstream.{
2525
KStream => KStreamJ,
2626
KTable => KTableJ,
2727
SessionWindowedKStream => SessionWindowedKStreamJ,
28-
TimeWindowedKStream => TimeWindowedKStreamJ,
29-
_
28+
TimeWindowedKStream => TimeWindowedKStreamJ
3029
}
3130
import org.apache.kafka.streams.scala.kstream._
3231
import org.apache.kafka.streams.KeyValue
3332
import org.apache.kafka.common.serialization.Serde
34-
import scala.language.implicitConversions
3533

34+
import scala.language.implicitConversions
3635
import org.apache.kafka.streams.processor.StateStore
3736

3837
/**
@@ -65,20 +64,20 @@ object ImplicitConversions {
6564
// and these implicits will convert them to `Serialized`, `Produced` or `Consumed`
6665

6766
implicit def serializedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K, V] =
68-
Serialized.`with`(keySerde, valueSerde)
67+
Serialized.`with`[K, V]
6968

7069
implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] =
71-
Consumed.`with`(keySerde, valueSerde)
70+
Consumed.`with`[K, V]
7271

7372
implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
74-
Produced.`with`(keySerde, valueSerde)
73+
Produced.`with`[K, V]
7574

7675
implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K],
7776
valueSerde: Serde[V]): Materialized[K, V, S] =
78-
Materialized.`with`[K, V, S](keySerde, valueSerde)
77+
Materialized.`with`[K, V, S]
7978

8079
implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
8180
valueSerde: Serde[V],
8281
otherValueSerde: Serde[VO]): Joined[K, V, VO] =
83-
Joined.`with`(keySerde, valueSerde, otherValueSerde)
82+
Joined.`with`[K, V, VO]
8483
}

streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.kafka.streams.scala
2121

2222
import java.util.regex.Pattern
2323

24-
import org.apache.kafka.streams.kstream.{Consumed, GlobalKTable, Materialized}
24+
import org.apache.kafka.streams.kstream.GlobalKTable
2525
import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
2626
import org.apache.kafka.streams.state.StoreBuilder
2727
import org.apache.kafka.streams.{Topology, StreamsBuilder => StreamsBuilderJ}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.scala.kstream
18+
19+
import org.apache.kafka.common.serialization.Serde
20+
import org.apache.kafka.streams.kstream.{Consumed => ConsumedJ}
21+
import org.apache.kafka.streams.Topology
22+
import org.apache.kafka.streams.processor.TimestampExtractor
23+
24+
object Consumed {
25+
26+
/**
27+
* Create an instance of [[Consumed]] with the supplied arguments. `null` values are acceptable.
28+
*
29+
* @tparam K key type
30+
* @tparam V value type
31+
* @param timestampExtractor the timestamp extractor to used. If `null` the default timestamp extractor from
32+
* config will be used
33+
* @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config
34+
* will be used
35+
* @param keySerde the key serde to use.
36+
* @param valueSerde the value serde to use.
37+
* @return a new instance of [[Consumed]]
38+
*/
39+
def `with`[K, V](
40+
timestampExtractor: TimestampExtractor,
41+
resetPolicy: Topology.AutoOffsetReset
42+
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
43+
ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
44+
45+
/**
46+
* Create an instance of [[Consumed]] with key and value [[Serde]]s.
47+
*
48+
* @tparam K key type
49+
* @tparam V value type
50+
* @return a new instance of [[Consumed]]
51+
*/
52+
def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
53+
ConsumedJ.`with`(keySerde, valueSerde)
54+
55+
/**
56+
* Create an instance of [[Consumed]] with a [[TimestampExtractor]].
57+
*
58+
* @param timestampExtractor the timestamp extractor to used. If `null` the default timestamp extractor from
59+
* config will be used
60+
* @tparam K key type
61+
* @tparam V value type
62+
* @return a new instance of [[Consumed]]
63+
*/
64+
def `with`[K, V](timestampExtractor: TimestampExtractor)(implicit keySerde: Serde[K],
65+
valueSerde: Serde[V]): ConsumedJ[K, V] =
66+
ConsumedJ.`with`(timestampExtractor).withKeySerde(keySerde).withValueSerde(valueSerde)
67+
68+
/**
69+
* Create an instance of [[Consumed]] with a [[Topology.AutoOffsetReset]].
70+
*
71+
* @tparam K key type
72+
* @tparam V value type
73+
* @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used
74+
* @return a new instance of [[Consumed]]
75+
*/
76+
def `with`[K, V](resetPolicy: Topology.AutoOffsetReset)(implicit keySerde: Serde[K],
77+
valueSerde: Serde[V]): ConsumedJ[K, V] =
78+
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
79+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.scala.kstream
18+
19+
import org.apache.kafka.common.serialization.Serde
20+
import org.apache.kafka.streams.kstream.{Joined => JoinedJ}
21+
22+
object Joined {
23+
24+
/**
25+
* Create an instance of [[org.apache.kafka.streams.kstream.Joined]] with key, value, and otherValue [[Serde]]
26+
* instances.
27+
* `null` values are accepted and will be replaced by the default serdes as defined in config.
28+
*
29+
* @tparam K key type
30+
* @tparam V value type
31+
* @tparam VO other value type
32+
* @param keySerde the key serde to use.
33+
* @param valueSerde the value serde to use.
34+
* @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
35+
* @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes
36+
*/
37+
def `with`[K, V, VO](implicit keySerde: Serde[K],
38+
valueSerde: Serde[V],
39+
otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
40+
JoinedJ.`with`(keySerde, valueSerde, otherValueSerde)
41+
42+
}

streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.scala.FunctionConversions._
2727
/**
2828
* Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
2929
*
30-
* @param [K] Type of keys
31-
* @param [V] Type of values
30+
* @tparam K Type of keys
31+
* @tparam V Type of values
3232
* @param inner The underlying Java abstraction for KGroupedStream
3333
*
3434
* @see `org.apache.kafka.streams.kstream.KGroupedStream`

streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.scala.FunctionConversions._
2727
/**
2828
* Wraps the Java class KGroupedTable and delegates method calls to the underlying Java object.
2929
*
30-
* @param [K] Type of keys
31-
* @param [V] Type of values
30+
* @tparam K Type of keys
31+
* @tparam V Type of values
3232
* @param inner The underlying Java abstraction for KGroupedTable
3333
*
3434
* @see `org.apache.kafka.streams.kstream.KGroupedTable`
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.scala.kstream
18+
19+
import org.apache.kafka.common.serialization.Serde
20+
import org.apache.kafka.streams.kstream.{Materialized => MaterializedJ}
21+
import org.apache.kafka.streams.processor.StateStore
22+
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, ByteArraySessionStore, ByteArrayWindowStore}
23+
import org.apache.kafka.streams.state.{KeyValueBytesStoreSupplier, SessionBytesStoreSupplier, WindowBytesStoreSupplier}
24+
25+
object Materialized {
26+
27+
/**
28+
* Materialize a [[StateStore]] with the provided key and value [[Serde]]s.
29+
* An internal name will be used for the store.
30+
*
31+
* @tparam K key type
32+
* @tparam V value type
33+
* @tparam S store type
34+
* @param keySerde the key [[Serde]] to use.
35+
* @param valueSerde the value [[Serde]] to use.
36+
* @return a new [[Materialized]] instance with the given key and value serdes
37+
*/
38+
def `with`[K, V, S <: StateStore](implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, S] =
39+
MaterializedJ.`with`(keySerde, valueSerde)
40+
41+
/**
42+
* Materialize a [[StateStore]] with the given name.
43+
*
44+
* @tparam K key type of the store
45+
* @tparam V value type of the store
46+
* @tparam S type of the [[StateStore]]
47+
* @param storeName the name of the underlying [[org.apache.kafka.streams.scala.kstream.KTable]] state store;
48+
* valid characters are ASCII alphanumerics, '.', '_' and '-'.
49+
* @param keySerde the key serde to use.
50+
* @param valueSerde the value serde to use.
51+
* @return a new [[Materialized]] instance with the given storeName
52+
*/
53+
def as[K, V, S <: StateStore](storeName: String)(implicit keySerde: Serde[K],
54+
valueSerde: Serde[V]): MaterializedJ[K, V, S] =
55+
MaterializedJ.as(storeName).withKeySerde(keySerde).withValueSerde(valueSerde)
56+
57+
/**
58+
* Materialize a [[org.apache.kafka.streams.state.WindowStore]] using the provided [[WindowBytesStoreSupplier]].
59+
*
60+
* Important: Custom subclasses are allowed here, but they should respect the retention contract:
61+
* Window stores are required to retain windows at least as long as (window size + window grace period).
62+
* Stores constructed via [[org.apache.kafka.streams.state.Stores]] already satisfy this contract.
63+
*
64+
* @tparam K key type of the store
65+
* @tparam V value type of the store
66+
* @param supplier the [[WindowBytesStoreSupplier]] used to materialize the store
67+
* @param keySerde the key serde to use.
68+
* @param valueSerde the value serde to use.
69+
* @return a new [[Materialized]] instance with the given supplier
70+
*/
71+
def as[K, V](supplier: WindowBytesStoreSupplier)(implicit keySerde: Serde[K],
72+
valueSerde: Serde[V]): MaterializedJ[K, V, ByteArrayWindowStore] =
73+
MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
74+
75+
/**
76+
* Materialize a [[org.apache.kafka.streams.state.SessionStore]] using the provided [[SessionBytesStoreSupplier]].
77+
*
78+
* Important: Custom subclasses are allowed here, but they should respect the retention contract:
79+
* Session stores are required to retain windows at least as long as (session inactivity gap + session grace period).
80+
* Stores constructed via [[org.apache.kafka.streams.state.Stores]] already satisfy this contract.
81+
*
82+
* @tparam K key type of the store
83+
* @tparam V value type of the store
84+
* @param supplier the [[SessionBytesStoreSupplier]] used to materialize the store
85+
* @param keySerde the key serde to use.
86+
* @param valueSerde the value serde to use.
87+
* @return a new [[Materialized]] instance with the given supplier
88+
*/
89+
def as[K, V](supplier: SessionBytesStoreSupplier)(implicit keySerde: Serde[K],
90+
valueSerde: Serde[V]): MaterializedJ[K, V, ByteArraySessionStore] =
91+
MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
92+
93+
/**
94+
* Materialize a [[org.apache.kafka.streams.state.KeyValueStore]] using the provided [[KeyValueBytesStoreSupplier]].
95+
*
96+
* @tparam K key type of the store
97+
* @tparam V value type of the store
98+
* @param supplier the [[KeyValueBytesStoreSupplier]] used to materialize the store
99+
* @param keySerde the key serde to use.
100+
* @param valueSerde the value serde to use.
101+
* @return a new [[Materialized]] instance with the given supplier
102+
*/
103+
def as[K, V](
104+
supplier: KeyValueBytesStoreSupplier
105+
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, ByteArrayKeyValueStore] =
106+
MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
107+
}

0 commit comments

Comments
 (0)