Skip to content

KAFKA-7396 Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes #5551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 11, 2018

Conversation

joan38
Copy link
Contributor

@joan38 joan38 commented Aug 22, 2018

We want to make sure that we always have a serde for all Materialized, Serialized, Joined, Consumed and Produced.
For that we can make use of the implicit parameters in Scala.

KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-365%3A+Materialized%2C+Serialized%2C+Joined%2C+Consumed+and+Produced+with+implicit+Serde

@joan38
Copy link
Contributor Author

joan38 commented Aug 22, 2018

This one is ready for review @guozhangwang @vvcephei @ijuma @debasishg

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @vvcephei could you take another look?

@debasishg
Copy link
Contributor

LGTM .. should we also do the same for Serialized ?

@joan38
Copy link
Contributor Author

joan38 commented Aug 23, 2018

@debasishg good point even tho I don't think we have to create manually any Serialized are we?
I guess if we do the Serialized, we should also do the Consumed and Produced?

@joan38
Copy link
Contributor Author

joan38 commented Aug 23, 2018

@mjsax Could you label this PR stream please?

@mjsax
Copy link
Member

mjsax commented Aug 23, 2018

Wondering if this is a public API change? Do we need a KIP for this?

@guozhangwang
Copy link
Contributor

I do not think it is a public API: users do not need to change any code they have if they do not want to, as this change is for allowing them to get some "syntax sugar".

@joan38 joan38 changed the title Materialized with implicit Serdes Materialized, Serialized, Consumed and Produced with implicit Serdes Aug 23, 2018
@joan38
Copy link
Contributor Author

joan38 commented Aug 23, 2018

Ok I'm adding the Serialized, Consumed and Produced with implicit Serdes too

@joan38 joan38 force-pushed the implicit-serde-mater branch 2 times, most recently from fe842a0 to e72c98d Compare August 23, 2018 20:54
Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @joan38 , It looks good!

I think the sugar classes might benefit from scaladoc explaining what they are, what they do, and providing a few examples.


package object scala {
type ByteArrayKeyValueStore = KeyValueStore[Bytes, Array[Byte]]
type ByteArraySessionStore = SessionStore[Bytes, Array[Byte]]
type ByteArrayWindowStore = WindowStore[Bytes, Array[Byte]]

type Materialized[K, V, S <: StateStore] = org.apache.kafka.streams.kstream.Materialized[K, V, S]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is necessary? Everything seems to compile and run fine when I remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You must have not compiled tests.
You can't import the type from java and the object from Scala if they have the same name but different package, unless you give the fully qualified name.
The type here does the import for you in the right package.

@vvcephei
Copy link
Contributor

@guozhangwang, I agree that this is not changing or breaking any public APIs, but it seems like it is adding new public APIs. We can think of it as similar to adding new convenience methods to the existing builders. As such, it does seem like it needs a KIP.

@vvcephei
Copy link
Contributor

@joan38 Also, you might already have this in the works, but I think we should keep up the trend of having test coverage on all these additions.

We initially didn't have enough respect for how hard it is to just read a scala DSL and visually verify that it'll behave as expected. It might be over-learning, but I'd feel better if we could see the expected usage in some tests.

@joan38 joan38 force-pushed the implicit-serde-mater branch from e72c98d to 97498e0 Compare August 24, 2018 02:00
@joan38
Copy link
Contributor Author

joan38 commented Aug 24, 2018

As you expected I wrote the tests 😄 and ported the documentation.

@joan38 joan38 force-pushed the implicit-serde-mater branch from 97498e0 to b07bea9 Compare August 24, 2018 08:16
@joan38 joan38 changed the title Materialized, Serialized, Consumed and Produced with implicit Serdes Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes Aug 24, 2018
@joan38
Copy link
Contributor Author

joan38 commented Aug 24, 2018

I forgot the Joined one

@joan38 joan38 force-pushed the implicit-serde-mater branch from b07bea9 to 529a02a Compare August 25, 2018 18:06
@joan38 joan38 changed the title Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes @joan38 Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes Aug 25, 2018
@joan38
Copy link
Contributor Author

joan38 commented Aug 25, 2018

@joan38 joan38 changed the title @joan38 Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes KIP-365 Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes Aug 25, 2018
@joan38 joan38 force-pushed the implicit-serde-mater branch from 529a02a to 32c2caa Compare August 25, 2018 18:46

implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] =
Consumed.`with`(keySerde, valueSerde)
kstream.Consumed.`with`(keySerde, valueSerde)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose for prefixing with kstream ?

If I remove the prefix, the code still compiles.

Copy link
Contributor Author

@joan38 joan38 Aug 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this was an IDE refator that introduced it.

@@ -0,0 +1,83 @@
/*
* Copyright (C) 2018 Joan Goyeau.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use individual's name.

Copy link
Contributor Author

@joan38 joan38 Aug 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that?
I asked that earlier and it seemed ok: #5502 (comment)

Is there a special case for Alexis Seigneurin?:
https://github.com/apache/kafka/blob/trunk/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala#L3

Sorry I don't know much about those licensing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other code was release before it was merged into Apache Kafka. So the original copyright was with Alexis and Lightbend (ie, Lightbend copied Alexis code and later contributed it to AK).

For regular PRs, the contributor license agreement states, the ASF gets the copyright (from my understanding---also not a lawyer). Otherwise, it would also be a hassle to add copyright to any file that is changed... Also not a lawyer, but please read https://www.apache.org/licenses/icla.pdf (for committers it's required to file this -- for contributors consider the wiki information: https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes

When you contribute code, you affirm that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.

* config will be used
* @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config
* will be used
* @tparam K key type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems these two lines should be moved closer to line 31 where keySerde is documented

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are type param documentation. They are usually after the parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From KStream:

 * @tparam K Type of keys
 * @tparam V Type of values
 * @param inner The underlying Java abstraction for KStream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that make sense to move them up since this is the order they are displayed in the signature. I will change that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend we adopt whatever are normal scaladoc conventions. I think I'm responsible for the KStream doc, but I wasn't aware of a convention.

@@ -0,0 +1,44 @@
/*
* Copyright (C) 2018 Joan Goyeau.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop author, please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See reply above #5551 (comment)

@joan38 joan38 force-pushed the implicit-serde-mater branch 3 times, most recently from 95606b1 to 6666a53 Compare August 26, 2018 02:12
@joan38
Copy link
Contributor Author

joan38 commented Aug 28, 2018

@tedyu Yes I'm not sure if we should move the tests in the internal package just for the sake of making it accessible?
@bbejeck To me it sounds like there should be no XInternal and just immutable ones but I'm a Scala guy talking here.
I don't know much how we can do to restrict this but still be able to use it freely in tests.

@joan38 joan38 force-pushed the implicit-serde-mater branch from b4140d5 to c7db46b Compare August 28, 2018 22:15
@vvcephei
Copy link
Contributor

One problem with Java is that the access control mechanisms are very coarse. Let's look at ProducedInternal. The reason it exists is to expose the keySerde(), valueSerde(), and streamPartitioner() getters. The Produced class intentionally doesn't expose the getters.

Regardless of whether Produced is immutable, if we add these getters to Produced, they would become part of the public API (which needs a KIP, etc).

It doesn't really matter whether ProducedInternal has a public constructor or not, since it's still in the internal package, which documents that it's "not a public interface".

It's better, of course, if we can avoid making the constructor public, since then the compiler backs up our package-based "this is not public" statement.

If this were scala, we could add private[streams] modifiers to these methods and just put them in Produced, but there's no way to do this in Java... c'est la vie.

@joan38 joan38 force-pushed the implicit-serde-mater branch from c7db46b to 8e4777f Compare August 29, 2018 22:55
@joan38
Copy link
Contributor Author

joan38 commented Aug 30, 2018

Do you know why this is not triggering Jenkins jobs?

@joan38 joan38 force-pushed the implicit-serde-mater branch from 8e4777f to a91cb32 Compare August 30, 2018 21:51
@vvcephei
Copy link
Contributor

I'm not sure...

Retest this, please.

@joan38
Copy link
Contributor Author

joan38 commented Aug 31, 2018

It worked 👍

@vvcephei
Copy link
Contributor

Yeah, Jenkins must have fallen asleep. (or maybe there were no available workers or something)

@vvcephei
Copy link
Contributor

It looks like the bounds may be backwards:

00:04:36.049 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala:60: type mismatch;
00:04:36.049  found   : org.apache.kafka.streams.processor.StreamPartitioner[_$1,_$2] where type _$2 <: V, type _$1 <: K
00:04:36.049  required: org.apache.kafka.streams.processor.StreamPartitioner[_ >: K, _ >: V]
00:04:36.049 Note: _$1 <: Any, but Java-defined trait StreamPartitioner is invariant in type K.
00:04:36.049 You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
00:04:36.049 Note: _$2 <: Any, but Java-defined trait StreamPartitioner is invariant in type V.
00:04:36.049 You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
00:04:36.049     ProducedJ.`with`(keySerde, valueSerde, partitioner)

@joan38
Copy link
Contributor Author

joan38 commented Aug 31, 2018

Indeed, I will correct this.

@joan38 joan38 force-pushed the implicit-serde-mater branch from a91cb32 to 3ee9b4f Compare August 31, 2018 23:00
@joan38
Copy link
Contributor Author

joan38 commented Aug 31, 2018

@vvcephei I just corrected it and now see:

Error:(60, 22) type mismatch;
 found   : org.apache.kafka.common.serialization.Serde[K]
 required: org.apache.kafka.common.serialization.Serde[_$1] where type _$1 >: K
Note: K <: _$1, but Java-defined trait Serde is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: _$1`. (SLS 3.2.10)
    ProducedJ.`with`(keySerde, valueSerde, partitioner)

Again, I don't understand how can we have K for part or the function and _ >: K for the other part of the function. To me it should be all on the same type.

def `with`[K, V](
timestampExtractor: TimestampExtractor,
resetPolicy: Topology.AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own education: why we only need implicit for the first parameter, instead of having it for both key and value serdes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implicit keyword applies to the whole set of parameters, not just to the first parameter.

Copy link
Contributor Author

@joan38 joan38 Sep 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang you are right that the declaration syntax of implicit parameters in Scala is confusing.
See this proposal scala/scala3#1260 for Scala 3:

The syntax (implicit x: T, y: U) is a bit strange in that implicit conceptually scopes over
x and y but looks like a modifier for just x.

That would make it clear that the implicit is applied to the whole group of curried parameters.

@@ -180,7 +191,7 @@ class TopologyTest extends JUnitSuite {

// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
.groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
.groupByKey(kstream.Serialized.`with`[String, JLong])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need the kstream. prefix here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong refactoring from my IDE. I will correct this.

@joan38 joan38 force-pushed the implicit-serde-mater branch 2 times, most recently from 92f9228 to a019556 Compare September 4, 2018 23:03
@joan38
Copy link
Contributor Author

joan38 commented Sep 7, 2018

Are we good with this?

@vvcephei
Copy link
Contributor

vvcephei commented Sep 7, 2018

Hey @joan38 ,

You need to close the vote email thread and update the KIP status. I sent a message with the next steps to the vote thread for KIP-365.

val consumed: Consumed[String, Long] = Consumed.`with`[String, Long]

val internalConsumed = new ConsumedInternal(consumed)
internalConsumed.keySerde shouldBe Serdes.String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gah. I just now realized that's it's not ok to be caching these serdes. It's not part of your change, though. I'll send a separate PR to fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just FYI: https://issues.apache.org/jira/browse/KAFKA-7386 (I've assigned it to myself)

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks!

We can merge as soon as the KIP is finalized. /cc @guozhangwang

@mjsax
Copy link
Member

mjsax commented Sep 11, 2018

Btw: all KIPs should be backed by a Jira and the PR should have the ticket number in the title. Please update the PR and KIP accordingly after you created the Jira.

Also, this PR seems to need a rebase. Thx.

@joan38 joan38 force-pushed the implicit-serde-mater branch from a019556 to 5ba0eb5 Compare September 11, 2018 08:04
@joan38 joan38 changed the title KIP-365 Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes KAFKA-7396 Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes Sep 11, 2018
@joan38
Copy link
Contributor Author

joan38 commented Sep 11, 2018

@mjsax @vvcephei All done.
Should we go ahead and merge?

@guozhangwang guozhangwang merged commit acd3858 into apache:trunk Sep 11, 2018
@joan38 joan38 deleted the implicit-serde-mater branch September 12, 2018 07:20
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…ith implicit Serdes (apache#5551)

We want to make sure that we always have a serde for all Materialized, Serialized, Joined, Consumed and Produced.
For that we can make use of the implicit parameters in Scala.

KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-365%3A+Materialized%2C+Serialized%2C+Joined%2C+Consumed+and+Produced+with+implicit+Serde

Reviewers: John Roesler <[email protected]>, Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>, Ted Yu <[email protected]>
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants