Skip to content

KAFKA-12464: follow up PR to refactor codes and add logs #10645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 9, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,17 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>

int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers);
int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers);
// the expected number of members with maxQuota assignment
int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers;
// the number of members with exactly maxQuota partitions assigned
int numMembersHavingMorePartitions = 0;
// the expected number of members with over minQuota assignment
int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount % numberOfConsumers;
// the number of members with over minQuota partitions assigned
int numMembersAssignedOverMinQuota = 0;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit -- and to clarify up front, if you agree with this let's still hold off on doing it here so this PR can finally be merged, as I figure any nits can be addressed in your general assign PR:

It's still a bit unclear what this value will be sued for when you first see it, maybe we can work in the word minQuota somewhere in the name? Eg expectedNumMembersWithMoreThanMinQuotaPartitions, or for a slightly shorter example numConsumersAssignedOverMinQuota, or something between or similar to those

FYI I'm also ok with it as-is if you prefer the current name -- just wanted to throw out some other suggestions. I'll trust you to pick whatever name feels right

Make sense! I choose expectedNumMembersAssignedOverMinQuota and numMembersAssignedOverMinQuota. :)


// initialize the assignment map with an empty array of size maxQuota for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota))));

List<TopicPartition> assignedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
// Reassign previously owned partitions to the expected number
for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
Expand All @@ -203,10 +203,10 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
assignedPartitions.addAll(ownedPartitions);
}
unfilledMembers.add(consumer);
} else if (ownedPartitions.size() >= maxQuota && numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
// consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected max capacity members
// so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
numMembersHavingMorePartitions++;
} else if (ownedPartitions.size() >= maxQuota && numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {
// consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members
// with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
numMembersAssignedOverMinQuota++;
List<TopicPartition> maxQuotaPartitions = ownedPartitions.subList(0, maxQuota);
consumerAssignment.addAll(maxQuotaPartitions);
assignedPartitions.addAll(maxQuotaPartitions);
Expand All @@ -218,8 +218,10 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
consumerAssignment.addAll(minQuotaPartitions);
assignedPartitions.addAll(minQuotaPartitions);
allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
// this consumer is potential maxQuota candidate since we're still under the number of expected max capacity members
if (numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
// this consumer is potential maxQuota candidate since we're still under the number of expected members
// with more than the minQuota partitions. Note, if the number of expected members with more than
// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers
if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (again, please address this in the other PR so I can merge this one): as Guozhang pointed out in another comment, in the case minQuota == maxQuota, this comment is a bit misleading as the number of expected max capacity members is technically all of them, but the variable expectedNumMembersHavingMorePartitions refers to the number of members who have more than the minQuota number of partitions, which in that case would actually be zero.

Agree! I refer to your suggested change except this consumer may be assigned one more partition (explain below), and add this line:

Note, if the number of expected members with more than the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers

This should make it more clear.

Just a thought: technically it's not even a "potential maxQuota" member, since as you pointed out in another comment "the unassignedPartitions size will always >= unfilledMembers size" -- therefore anything in unfilledMembers will in fact need to receive at least one partition. Does that sound right to you? (this is just a followup question to make sure we're on the same page, no need to do anything for this one)

Not excatly. After what we've changed to add potential maxQuota members into unfilledMembers, the unassignedPartitions size will not always >= unfilledMembers size. There will be cases that the unfilledMembers won't get any additional partition. Here's the example (also in new added tests)
2 topics: t1, t2
total partitions: t1p0, t1p1, t2p0, t2p1, t2p2 ==> 5 partitions
current assignment for c1, c2: (the partition t2p0 was assigning to c3, but c3 dropped)
c1: t1p0, t2p1
c2: t1p1, t2p2

In this situation, the maxQuota is 3, minQuota is 2, expectedNumMembersAssignedOverMinQuota is 1. so, after 1st reassign previously owned partitions phase, the numMembersAssignedOverMinQuota is still 0, unfilledMembers will be [c1, c2], and unassignedPartitions will be [t2p0]. After 2nd phase, only c1 will get 1 partition assigned.

unfilledMembers.add(consumer);
}
}
Expand All @@ -242,6 +244,9 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
if (unfilledMembers.isEmpty()) {
// Should not enter here since we have calculated the exact number to assign to each consumer
// There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners.
int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition);
log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}",
unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size()));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (for next PR): can you log an error before throwing the exception and include the set of unassigned partitions? Either just print out the unassignedPartitions along with the current partition being processed so you can figure out which partitions are remaining after that, or else by actually computing the remaining partitions that have yet to be assigned. Since it's an error case, I think it's ok to spend a little extra time computing that for better debuggability

Make sense. Added!

throw new IllegalStateException("No more unfilled consumers to be assigned.");
}
unfilledConsumerIter = unfilledMembers.iterator();
Expand All @@ -255,27 +260,35 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
partitionsTransferringOwnership.put(unassignedPartition, consumer);

int currentAssignedCount = consumerAssignment.size();
int expectedAssignedCount = numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ? maxQuota : minQuota;
int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
if (currentAssignedCount == expectedAssignedCount) {
if (currentAssignedCount == maxQuota) {
numMembersHavingMorePartitions++;
numMembersAssignedOverMinQuota++;
}
unfilledConsumerIter.remove();
}
}

if (!unfilledMembers.isEmpty()) {
// we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number
// of max capacity members. Otherwise, there must be error here.
if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions) {
throw new IllegalStateException(String.format("We haven't reached the allowed number of max capacity members, " +
"but no more partitions to be assigned to unfilled consumers: %s", unfilledMembers));
// we expected all the remaining unfilled members have minQuota partitions and we're already at the expected number
// of members with more than the minQuota partitions. Otherwise, there must be error here.
if (numMembersAssignedOverMinQuota != expectedNumMembersAssignedOverMinQuota) {
log.error("Current number of members with more than the minQuota partitions: {}, is less than the expected number " +
"of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}",
numMembersAssignedOverMinQuota, expectedNumMembersAssignedOverMinQuota, unfilledMembers);
throw new IllegalStateException("We haven't reached the expected number of members with " +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here, can you log an error with the remaining unfilledMembers? I know you already do that in the exception message, but imo it would be better to print in a log message instead of an exception, as it may be long

I included most info in the error log, and just put simple error message in exception.

"more than the minQuota partitions, but no more partitions to be assigned");
} else {
for (String unfilledMember : unfilledMembers) {
int assignedPartitionsCount = assignment.get(unfilledMember).size();
if (assignedPartitionsCount != minQuota) {
throw new IllegalStateException(String.format("Consumer: [%s] should have %d partitions, but got %d partitions, " +
"and no more partitions to be assigned", unfilledMember, minQuota, assignedPartitionsCount));
log.error("Consumer: [{}] should have {} partitions, but got {} partitions, and no more partitions " +
"to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers);
throw new IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota partitions, " +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the exception here looks good, but once again let's also log an error (it just makes it easier to debug when you have something concrete in the place you encountered the error, whereas exceptions are not always printed right away). Should probably just log any info that could be useful, such as all remaining unfilledMembers

I included most info in the error log, and just put simple error message in exception.

"and no more partitions to be assigned", unfilledMember));
} else {
log.trace("skip over this unfilled member: [{}] because we've reached the expected number of " +
"members with more than the minQuota partitions, and this member already have minQuota partitions", unfilledMember);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add an else case that just logs that we skipped over this member because we reached max capacity and it was still at min? Not sure if debug or trace is more appropriate, might be worth just running the tests with this log in place to see how often it gets printed

I put in trace level. After running all tests, there are 6 out of 27 tests will print this log.

}
}
}
Expand Down