Skip to content

buffer() using TimeBasedChunks results in duplicates being sent onNext #429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
elandau opened this issue Oct 10, 2013 · 6 comments
Closed
Assignees
Labels

Comments

@elandau
Copy link

elandau commented Oct 10, 2013

This seems to be isolated to the first chunk.

    @Test
    public void testDuplicate() throws Exception {
        final Set<Integer> unique = new HashSet<Integer>();
        Observable.create(new OnSubscribeFunc<Integer>() {
            @Override
            public Subscription onSubscribe(final Observer<? super Integer> t1) {
                final Future<?> t = Executors.newSingleThreadExecutor().submit(new Runnable() {
                    @Override
                    public void run() {
                        int count = 0;
                        for (int i = 0; i < 11; i++) {
                            t1.onNext(count++);
                            try {
                                Thread.sleep(10);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                });

                return new Subscription() {
                    @Override
                    public void unsubscribe() {
                        t.cancel(true);
                    }
                };
            }
        })
        .buffer(100, 100, TimeUnit.MILLISECONDS) 
        .subscribe(new Action1<List<Integer>>() {
            @Override
            public void call(List<Integer> t1) {
                if (t1.isEmpty()) 
                    return;

                LOG.info(t1.toString());
                for (Integer i : t1) {
                    synchronized (unique) {
                        if (unique.contains(i)) {
                            LOG.error("Duplicate for " + i);
                        }
                        Assert.assertFalse(unique.contains(i));
                        unique.add(i);
                    }
                }
            }
        });

        Thread.sleep(10000);

    }
@zsxwing
Copy link
Member

zsxwing commented Oct 12, 2013

I think this is not a bug. I suppose you want to do something like this:

create a chunk --> sleep 100ms --> emit the chunk -> create a new chunk

However, as emit the chunk and create a new chunk may run in different threads, their order is not determined. So if you use buffer(100, 100, TimeUnit.MILLISECONDS), somethink like this may happen:

create a chunk --> sleep 100ms --> create a new chunk -> emit the old chunk

So before the old chunk is emited, the old and new chunks will both receive any results after create a new chunk.

I think this is not a bug because TimeBasedChunks is a OverlappingChunks, which allows such indeterminacy.

@benjchristensen
Copy link
Member

What's the status of this being a bug or not? @headinthebox Can you confirm?

@benjchristensen
Copy link
Member

If I change .buffer(100, 100, TimeUnit.MILLISECONDS) to .buffer(100, TimeUnit.MILLISECONDS) then I don't see overlaps when I run it many times.

With 100, 100 I could agree that it is a valid race-condition, but I'm also testing with 100, 101 and seeing it and that doesn't make sense.

.buffer(100, 101, TimeUnit.MILLISECONDS)

With that I see this:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[9, 10]
Duplicate for 9

That is not expected, so I tend to agree that we have a bug.

The second buffer starting at 101 should not have a value emitted during the first 100ms.

@ghost ghost assigned benjchristensen Jan 2, 2014
@Acardiac
Copy link
Contributor

If I change .buffer(100, 100, TimeUnit.MILLISECONDS) to .buffer(100, TimeUnit.MILLISECONDS) then I don't see overlaps when I run it many times.

And if you change it to .buffer(42, TimeUnit.MICROSECONDS), then some items will be dropped.

see also #756

That is not expected, so I tend to agree that we have a bug.

+1

@akarnokd
Copy link
Member

akarnokd commented May 8, 2014

This bug should be resolved in the latest release.

@benjchristensen
Copy link
Member

This test now passes:

    @Test
    public void testDuplicate() throws Exception {
        final Set<Integer> unique = new HashSet<Integer>();
        Observable.create(new OnSubscribe<Integer>() {
            @Override
            public void call(final Subscriber<? super Integer> t1) {
                final Future<?> t = Executors.newSingleThreadExecutor().submit(new Runnable() {
                    @Override
                    public void run() {
                        int count = 0;
                        for (int i = 0; i < 11; i++) {
                            t1.onNext(count++);
                            try {
                                Thread.sleep(10);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                });

                t1.add(Subscriptions.from(t));
            }
        })
        .buffer(100, 100, TimeUnit.MILLISECONDS) 
        .subscribe(new Action1<List<Integer>>() {
            @Override
            public void call(List<Integer> t1) {
                if (t1.isEmpty()) 
                    return;

                System.out.println(t1.toString());
                for (Integer i : t1) {
                    synchronized (unique) {
                        if (unique.contains(i)) {
                            System.out.println("Duplicate for " + i);
                        }
                        Assert.assertFalse(unique.contains(i));
                        unique.add(i);
                    }
                }
            }
        });

        Thread.sleep(10000);

    }

It emits:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[10]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants