-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-12754: Improve endOffsets for TaskMetadata #10634
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@rodesai @abbccdda @ableegoldman I had to make a couple changes to the task metadata to improve when the end offset was updated. Now we get it at poll phase which should give us the highest offset streams has seen at any point |
@wcarlson5 there's a failure that I'm guessing is related: |
@ableegoldman yep it looks like I am going to have to go back to the drawing board for either the test or the impl. I will look at it tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little out of the loop with the Task Metadata stuff, but if my understanding of things is correct then the issue with updating the end offsets is slightly different than what's addressed here. Basically we currently piggy-back on the StreamThread's #updateThreadMetadata
, but that's only invoked once when the thread goes from restoring to running. It seems like we should update the TaskMetadata separately, and more frequently, right? For example at each poll invocation. Or the TaskMetadata#endOffsets
could even just call through to each StreamTask#highWatermark for a fully up-to-date view of the end offsets, though the tradeoff is of course we make that call a bit more expensive. Thoughts?
Also, unrelated to your PR or this feature, it seems like the ThreadMetadata should have been updated at the very least any time the thread state changes, since the thread state is one of the essential pieces of the thread metadata...not sure why we would only update it when it goes to RUNNING, that seems to make the state field of the metadata pretty much useless. Maybe you can address that in your PR on the side (should be a one-line change in StreamThread#setState)
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Show resolved
Hide resolved
@ableegoldman Thanks for the review! I think I got to your comments. There was one miss understanding about how the |
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
Outdated
Show resolved
Hide resolved
TestUtils.waitForCondition(() -> taskMetadata.committedOffsets().get(topicPartition) == 1L, "the record was processed"); | ||
process.set(true); | ||
|
||
produceMessages(0L, inputTopic, "test1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, is there a reason for producing the messages one at a time, rather than all together at the beginning and then relying on process
to gate when they can be processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really, I wrote the process gate first before I wanted to have more than one at a time. I think it makes the consumer have to poll multiple times as it alternates between the pollPhase and processing but I am not sure if that actually improves the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, can you just fix the error messages in the test? After that I'll go ahead and merge
@ableegoldman I think it ready :) |
Looks like just the usual flaky test failures in the build: |
…e-allocations-lz4 * apache-github/trunk: (155 commits) KAFKA-12728: Upgrade gradle to 7.0.2 and shadow to 7.0.0 (apache#10606) KAFKA-12778: Fix QuorumController request timeouts and electLeaders (apache#10688) KAFKA-12754: Improve endOffsets for TaskMetadata (apache#10634) Rework on KAFKA-3968: fsync the parent directory of a segment file when the file is created (apache#10680) MINOR: set replication.factor to 1 to make StreamsBrokerCompatibilityService work with old broker (apache#10673) MINOR: prevent cleanup() from being called while Streams is still shutting down (apache#10666) KAFKA-8326: Introduce List Serde (apache#6592) KAFKA-12697: Add Global Topic and Partition count metrics to the Quorum Controller (apache#10679) KAFKA-12648: MINOR - Add TopologyMetadata.Subtopology class for subtopology metadata (apache#10676) MINOR: Update jacoco to 0.8.7 for JDK 16 support (apache#10654) MINOR: exclude all `src/generated` and `src/generated-test` (apache#10671) KAFKA-12772: Move all transaction state transition rules into their states (apache#10667) KAFKA-12758 Added `server-common` module to have server side common classes. (apache#10638) MINOR Removed copying storage libraries specifically as they are already copied. (apache#10647) KAFKA-5876: KIP-216 Part 4, Apply InvalidStateStorePartitionException for Interactive Queries (apache#10657) KAFKA-12747: Fix flakiness in shouldReturnUUIDsWithStringPrefix (apache#10643) MINOR: remove unnecessary placeholder from WorkerSourceTask#recordSent (apache#10659) MINOR: Remove unused `scalatest` definition from `dependencies.gradle` (apache#10655) MINOR: checkstyle version upgrade: 8.20 -> 8.36.2 (apache#10656) KAFKA-12464: minor code cleanup and additional logging in constrained sticky assignment (apache#10645) ...
Improve endOffsets for TaskMetadata also add an int test for TaskMetadata offset collections
Committer Checklist (excluded from commit message)