@@ -178,17 +178,17 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
178
178
179
179
int minQuota = (int ) Math .floor (((double ) totalPartitionsCount ) / numberOfConsumers );
180
180
int maxQuota = (int ) Math .ceil (((double ) totalPartitionsCount ) / numberOfConsumers );
181
- // the expected number of members with maxQuota assignment
182
- int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers ;
183
- // the number of members with exactly maxQuota partitions assigned
184
- int numMembersHavingMorePartitions = 0 ;
181
+ // the expected number of members with over minQuota assignment
182
+ int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount % numberOfConsumers ;
183
+ // the number of members with over minQuota partitions assigned
184
+ int numMembersAssignedOverMinQuota = 0 ;
185
185
186
186
// initialize the assignment map with an empty array of size maxQuota for all members
187
187
Map <String , List <TopicPartition >> assignment = new HashMap <>(
188
188
consumerToOwnedPartitions .keySet ().stream ().collect (Collectors .toMap (c -> c , c -> new ArrayList <>(maxQuota ))));
189
189
190
190
List <TopicPartition > assignedPartitions = new ArrayList <>();
191
- // Reassign as many previously owned partitions as possible
191
+ // Reassign previously owned partitions to the expected number
192
192
for (Map .Entry <String , List <TopicPartition >> consumerEntry : consumerToOwnedPartitions .entrySet ()) {
193
193
String consumer = consumerEntry .getKey ();
194
194
List <TopicPartition > ownedPartitions = consumerEntry .getValue ();
@@ -203,10 +203,10 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
203
203
assignedPartitions .addAll (ownedPartitions );
204
204
}
205
205
unfilledMembers .add (consumer );
206
- } else if (ownedPartitions .size () >= maxQuota && numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ) {
207
- // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected max capacity members
208
- // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
209
- numMembersHavingMorePartitions ++;
206
+ } else if (ownedPartitions .size () >= maxQuota && numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ) {
207
+ // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members
208
+ // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
209
+ numMembersAssignedOverMinQuota ++;
210
210
List <TopicPartition > maxQuotaPartitions = ownedPartitions .subList (0 , maxQuota );
211
211
consumerAssignment .addAll (maxQuotaPartitions );
212
212
assignedPartitions .addAll (maxQuotaPartitions );
@@ -218,8 +218,10 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
218
218
consumerAssignment .addAll (minQuotaPartitions );
219
219
assignedPartitions .addAll (minQuotaPartitions );
220
220
allRevokedPartitions .addAll (ownedPartitions .subList (minQuota , ownedPartitions .size ()));
221
- // this consumer is potential maxQuota candidate since we're still under the number of expected max capacity members
222
- if (numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ) {
221
+ // this consumer is potential maxQuota candidate since we're still under the number of expected members
222
+ // with more than the minQuota partitions. Note, if the number of expected members with more than
223
+ // the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers
224
+ if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ) {
223
225
unfilledMembers .add (consumer );
224
226
}
225
227
}
@@ -242,6 +244,9 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
242
244
if (unfilledMembers .isEmpty ()) {
243
245
// Should not enter here since we have calculated the exact number to assign to each consumer
244
246
// There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners.
247
+ int currentPartitionIndex = unassignedPartitions .indexOf (unassignedPartition );
248
+ log .error ("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}" ,
249
+ unassignedPartitions .subList (currentPartitionIndex , unassignedPartitions .size ()));
245
250
throw new IllegalStateException ("No more unfilled consumers to be assigned." );
246
251
}
247
252
unfilledConsumerIter = unfilledMembers .iterator ();
@@ -255,27 +260,35 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
255
260
partitionsTransferringOwnership .put (unassignedPartition , consumer );
256
261
257
262
int currentAssignedCount = consumerAssignment .size ();
258
- int expectedAssignedCount = numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ? maxQuota : minQuota ;
263
+ int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota ;
259
264
if (currentAssignedCount == expectedAssignedCount ) {
260
265
if (currentAssignedCount == maxQuota ) {
261
- numMembersHavingMorePartitions ++;
266
+ numMembersAssignedOverMinQuota ++;
262
267
}
263
268
unfilledConsumerIter .remove ();
264
269
}
265
270
}
266
271
267
272
if (!unfilledMembers .isEmpty ()) {
268
- // we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number
269
- // of max capacity members. Otherwise, there must be error here.
270
- if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions ) {
271
- throw new IllegalStateException (String .format ("We haven't reached the allowed number of max capacity members, " +
272
- "but no more partitions to be assigned to unfilled consumers: %s" , unfilledMembers ));
273
+ // we expected all the remaining unfilled members have minQuota partitions and we're already at the expected number
274
+ // of members with more than the minQuota partitions. Otherwise, there must be error here.
275
+ if (numMembersAssignedOverMinQuota != expectedNumMembersAssignedOverMinQuota ) {
276
+ log .error ("Current number of members with more than the minQuota partitions: {}, is less than the expected number " +
277
+ "of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}" ,
278
+ numMembersAssignedOverMinQuota , expectedNumMembersAssignedOverMinQuota , unfilledMembers );
279
+ throw new IllegalStateException ("We haven't reached the expected number of members with " +
280
+ "more than the minQuota partitions, but no more partitions to be assigned" );
273
281
} else {
274
282
for (String unfilledMember : unfilledMembers ) {
275
283
int assignedPartitionsCount = assignment .get (unfilledMember ).size ();
276
284
if (assignedPartitionsCount != minQuota ) {
277
- throw new IllegalStateException (String .format ("Consumer: [%s] should have %d partitions, but got %d partitions, " +
278
- "and no more partitions to be assigned" , unfilledMember , minQuota , assignedPartitionsCount ));
285
+ log .error ("Consumer: [{}] should have {} partitions, but got {} partitions, and no more partitions " +
286
+ "to be assigned. The remaining unfilled consumers are: {}" , unfilledMember , minQuota , assignedPartitionsCount , unfilledMembers );
287
+ throw new IllegalStateException (String .format ("Consumer: [%s] doesn't reach minQuota partitions, " +
288
+ "and no more partitions to be assigned" , unfilledMember ));
289
+ } else {
290
+ log .trace ("skip over this unfilled member: [{}] because we've reached the expected number of " +
291
+ "members with more than the minQuota partitions, and this member already have minQuota partitions" , unfilledMember );
279
292
}
280
293
}
281
294
}
0 commit comments