Skip to content

Stable non-aio changesets (mostly channel refactoring) #613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/rt/rust_chan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
rust_chan::rust_chan(rust_task *task,
maybe_proxy<rust_port> *port,
size_t unit_sz) :
ref_count(1),
task(task),
port(port),
buffer(task, unit_sz) {
Expand Down Expand Up @@ -97,6 +98,60 @@ void rust_chan::send(void *sptr) {

return;
}

rust_chan *rust_chan::clone(maybe_proxy<rust_task> *target) {
size_t unit_sz = buffer.unit_sz;
maybe_proxy<rust_port> *port = this->port;
rust_task *target_task = NULL;
if (target->is_proxy() == false) {
port = this->port;
target_task = target->referent();
} else {
rust_handle<rust_port> *handle =
task->sched->kernel->get_port_handle(port->as_referent());
maybe_proxy<rust_port> *proxy = new rust_proxy<rust_port> (handle);
LOG(task, mem, "new proxy: " PTR, proxy);
port = proxy;
target_task = target->as_proxy()->handle()->referent();
}
return new (target_task) rust_chan(target_task, port, unit_sz);
}

/**
* Cannot Yield: If the task were to unwind, the dropped ref would still
* appear to be live, causing modify-after-free errors.
*/
void rust_chan::destroy() {
A(task->sched, ref_count == 0,
"Channel's ref count should be zero.");

if (is_associated()) {
if (port->is_proxy()) {
// Here is a good place to delete the port proxy we allocated
// in upcall_clone_chan.
rust_proxy<rust_port> *proxy = port->as_proxy();
disassociate();
delete proxy;
} else {
// We're trying to delete a channel that another task may be
// reading from. We have two options:
//
// 1. We can flush the channel by blocking in upcall_flush_chan()
// and resuming only when the channel is flushed. The problem
// here is that we can get ourselves in a deadlock if the
// parent task tries to join us.
//
// 2. We can leave the channel in a "dormnat" state by not freeing
// it and letting the receiver task delete it for us instead.
if (buffer.is_empty() == false) {
return;
}
disassociate();
}
}
delete this;
}

//
// Local Variables:
// mode: C++
Expand Down
9 changes: 7 additions & 2 deletions src/rt/rust_chan.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#ifndef RUST_CHAN_H
#define RUST_CHAN_H

class rust_chan : public rc_base<rust_chan>,
public task_owned<rust_chan>,
class rust_chan : public task_owned<rust_chan>,
public rust_cond {
public:
RUST_REFCOUNTED_WITH_DTOR(rust_chan, destroy())
rust_chan(rust_task *task, maybe_proxy<rust_port> *port, size_t unit_sz);

~rust_chan();
Expand All @@ -19,6 +19,11 @@ class rust_chan : public rc_base<rust_chan>,
bool is_associated();

void send(void *sptr);

rust_chan *clone(maybe_proxy<rust_task> *target);

// Called whenever the channel's ref count drops to zero.
void destroy();
};

//
Expand Down
21 changes: 10 additions & 11 deletions src/rt/rust_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,18 @@ static intptr_t const CONST_REFCOUNT = 0x7badface;
static size_t const BUF_BYTES = 2048;

// Every reference counted object should derive from this base class.
// Or use this macro. The macro is preferred as the base class will be
// disappearing.

template <typename T> struct rc_base {
intptr_t ref_count;

void ref() {
++ref_count;
}
#define RUST_REFCOUNTED(T) \
RUST_REFCOUNTED_WITH_DTOR(T, delete (T*)this)
#define RUST_REFCOUNTED_WITH_DTOR(T, dtor) \
intptr_t ref_count; \
void ref() { ++ref_count; } \
void deref() { if (--ref_count == 0) { dtor; } }

void deref() {
if (--ref_count == 0) {
delete (T*)this;
}
}
template <typename T> struct rc_base {
RUST_REFCOUNTED(T)

rc_base();
~rc_base();
Expand Down
46 changes: 2 additions & 44 deletions src/rt/rust_upcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,35 +126,7 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) {
scoped_lock with(task->kernel->scheduler_lock);

LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);

A(task->sched, chan->ref_count == 0,
"Channel's ref count should be zero.");

if (chan->is_associated()) {
if (chan->port->is_proxy()) {
// Here is a good place to delete the port proxy we allocated
// in upcall_clone_chan.
rust_proxy<rust_port> *proxy = chan->port->as_proxy();
chan->disassociate();
delete proxy;
} else {
// We're trying to delete a channel that another task may be
// reading from. We have two options:
//
// 1. We can flush the channel by blocking in upcall_flush_chan()
// and resuming only when the channel is flushed. The problem
// here is that we can get ourselves in a deadlock if the
// parent task tries to join us.
//
// 2. We can leave the channel in a "dormant" state by not freeing
// it and letting the receiver task delete it for us instead.
if (chan->buffer.is_empty() == false) {
return;
}
chan->disassociate();
}
}
delete chan;
chan->destroy();
}

/**
Expand All @@ -166,21 +138,7 @@ upcall_clone_chan(rust_task *task, maybe_proxy<rust_task> *target,
rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
scoped_lock with(task->kernel->scheduler_lock);
size_t unit_sz = chan->buffer.unit_sz;
maybe_proxy<rust_port> *port = chan->port;
rust_task *target_task = NULL;
if (target->is_proxy() == false) {
port = chan->port;
target_task = target->referent();
} else {
rust_handle<rust_port> *handle =
task->sched->kernel->get_port_handle(port->as_referent());
maybe_proxy<rust_port> *proxy = new rust_proxy<rust_port> (handle);
LOG(task, mem, "new proxy: " PTR, proxy);
port = proxy;
target_task = target->as_proxy()->handle()->referent();
}
return new (target_task) rust_chan(target_task, port, unit_sz);
return chan->clone(target);
}

extern "C" CDECL void
Expand Down