Skip to content

Refactoring conventions #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 10, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rxjava-core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
apply plugin: 'java'
apply plugin: 'eclipse'

// we want to target Java 1.5 so this can be used on Android
sourceCompatibility = JavaVersion.VERSION_1_5
targetCompatibility = JavaVersion.VERSION_1_5

dependencies {
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/org/rx/functions/Func0.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

public interface Func0<R> {
public R call();
}
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/org/rx/functions/Func1.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

public interface Func1<R, T1> {
public R call(T1 t1);
}
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/org/rx/functions/Func2.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

public interface Func2<R, T1, T2> {
public R call(T1 t1, T2 t2);
}
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/org/rx/functions/Func3.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

public interface Func3<R, T1, T2, T3> {
public R call(T1 t1, T2 t2, T3 t3);
}
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/org/rx/functions/Func4.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package org.rx.functions;

public interface Func4<R, T1, T2, T3, T4> {
public R call(T1 t1, T2 t2, T3 t3, T4 t4);
public R call(T1 t1, T2 t2, T3 t3, T4 t4);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@

import javax.annotation.concurrent.ThreadSafe;

import org.rx.reactive.IDisposable;

import org.rx.reactive.Subscription;

/**
* Thread-safe wrapper around WatchableSubscription that ensures unsubscribe can be called only once.
* Thread-safe wrapper around ObservableSubscription that ensures unsubscribe can be called only once.
*/
@ThreadSafe
/* package */class AtomicWatchableSubscription implements IDisposable {
/* package */final class AtomicObservableSubscription implements Subscription {

private AtomicReference<IDisposable> actualSubscription = new AtomicReference<IDisposable>();
private AtomicReference<Subscription> actualSubscription = new AtomicReference<Subscription>();
private AtomicBoolean unsubscribed = new AtomicBoolean(false);

public AtomicWatchableSubscription() {
public AtomicObservableSubscription() {

}

public AtomicWatchableSubscription(IDisposable actualSubscription) {
public AtomicObservableSubscription(Subscription actualSubscription) {
this.actualSubscription.set(actualSubscription);
}

Expand All @@ -32,7 +31,7 @@ public AtomicWatchableSubscription(IDisposable actualSubscription) {
* @throws IllegalStateException
* if trying to set more than once (or use this method after setting via constructor)
*/
public AtomicWatchableSubscription setActual(IDisposable actualSubscription) {
public AtomicObservableSubscription setActual(Subscription actualSubscription) {
if (!this.actualSubscription.compareAndSet(null, actualSubscription)) {
throw new IllegalStateException("Can not set subscription more than once.");
}
Expand All @@ -42,7 +41,7 @@ public AtomicWatchableSubscription setActual(IDisposable actualSubscription) {
@Override
public void unsubscribe() {
// get the real thing and set to null in an atomic operation so we will only ever call unsubscribe once
IDisposable actual = actualSubscription.getAndSet(null);
Subscription actual = actualSubscription.getAndSet(null);
// if it's not null we will unsubscribe
if (actual != null) {
actual.unsubscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import javax.annotation.concurrent.ThreadSafe;

import org.rx.reactive.IObserver;
import org.rx.reactive.Observer;

/**
* A thread-safe Watcher for transitioning states in operators.
* A thread-safe Observer for transitioning states in operators.
* <p>
* Allows both single-threaded and multi-threaded execution controlled by the following FastProperty:
* <li>reactive.watcher.multithreaded.enabled [Default: false]</li>
* <li>reactive.Observer.multithreaded.enabled [Default: false]</li>
* <p>
* Single-threaded Execution rules are:
* <ul>
Expand All @@ -29,7 +29,7 @@
* @param <T>
*/
@ThreadSafe
/* package */class AtomicWatcher<T> implements IObserver<T> {
/* package */final class AtomicObserver<T> implements Observer<T> {

/** Allow changing between forcing single or allowing multi-threaded execution of onNext */
private static boolean allowMultiThreaded = true;
Expand All @@ -41,29 +41,29 @@
}
}

private final IObserver<T> watcher;
private final Observer<T> Observer;

public AtomicWatcher(IObserver<T> watcher, AtomicWatchableSubscription subscription) {
public AtomicObserver(Observer<T> Observer, AtomicObservableSubscription subscription) {
if (allowMultiThreaded) {
this.watcher = new AtomicWatcherMultiThreaded<T>(watcher, subscription);
this.Observer = new AtomicObserverMultiThreaded<T>(Observer, subscription);
} else {
this.watcher = new AtomicWatcherSingleThreaded<T>(watcher, subscription);
this.Observer = new AtomicObserverSingleThreaded<T>(Observer, subscription);
}
}

@Override
public void onCompleted() {
watcher.onCompleted();
Observer.onCompleted();
}

@Override
public void onError(Exception e) {
watcher.onError(e);
Observer.onError(e);
}

@Override
public void onNext(T args) {
watcher.onNext(args);
Observer.onNext(args);
}

}
Loading