00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #define ACEDIA_REDUCED_ANNOUNCE
00031
00032 #ifndef _XOPEN_SOURCE
00033 # define _XOPEN_SOURCE
00034 #endif
00035
00036 #include "acedia_config.hpp"
00037 #include "actor.hpp"
00038 #include "actorref.hpp"
00039 #include "exceptions.hpp"
00040
00041 #include <boost/bind.hpp>
00042
00043 #include <iostream>
00044
00045 typedef boost::lock_guard<boost::mutex> MutexLocker;
00046
00047
00048 namespace acedia
00049 {
00050
00051 static const int DETACHED_OR_HIDDEN = DETACHED | HIDDEN;
00052
00053 void Actor::reply(const Any &val1)
00054 {
00055 send(m_lrm.sender(), val1);
00056 }
00057
00058 void Actor::reply(const acedia::Any &val1, const acedia::Any &val2)
00059 {
00060 send(m_lrm.sender(),val1,val2);
00061 }
00062
00063 struct NopProcessor : public details::AnyArrayProcessor
00064 {
00065 details::ContinuationInvoker ci;
00066
00067 NopProcessor() : ci(this, NULL) { }
00068
00069 virtual bool operator()(const AnyArray &) { return true; }
00070
00071 virtual details::ContinuationInvoker *matchingInvoker(const AnyArray &)
00072 {
00073 return &ci;
00074 }
00075 };
00076
00077 bool Actor::tryReceive(Message &storage)
00078 {
00079 details::container::SingleReaderListIterator<Message> i(m_mailbox);
00080 if (i.hasNext())
00081 {
00082 m_lrm = i.next();
00083 storage = m_lrm;
00084 i.erase();
00085 yield(READY);
00086 return true;
00087 }
00088 yield(READY);
00089 return false;
00090 }
00091
00092 Message Actor::receive()
00093 {
00094 Invoker i;
00095 i.add(new NopProcessor);
00096 receiveAndInvoke(i);
00097 ACEDIA_MEMORY_BARRIER();
00098 return m_lrm;
00099 }
00100
00101 bool Actor::tryReceiveAndInvoke(Invoker &imp)
00102 {
00103 details::ContinuationInvoker *invoker;
00104 details::container::SingleReaderListIterator<Message> i(m_mailbox);
00105 while (i.hasNext())
00106 {
00107 Message msg = i.next();
00108 invoker = imp.matchingInvokerTo(msg, m_trapExit);
00109 if (invoker)
00110 {
00111 m_lrm = msg;
00112 i.erase();
00113 yield(READY);
00114 ACEDIA_MEMORY_BARRIER();
00115 (*invoker)();
00116 return true;
00117 }
00118 }
00119 yield(READY);
00120 return false;
00121 }
00122
00123 void setBoolToFalse(bool* b)
00124 {
00125 *b = false;
00126 }
00127
00128 void doNothingCallback() { }
00129
00130 bool Actor::receiveWithin(Message& storage, boost::uint16_t msTimeout)
00131 {
00132 Invoker nopInvoker;
00133 nopInvoker.add(others() >> boost::function0<void>(doNothingCallback));
00134 if (receiveAndInvokeWithin(nopInvoker, msTimeout))
00135 {
00136 storage = lastReceivedMessage();
00137 return true;
00138 }
00139 return false;
00140 }
00141
00142 void Actor::receiveAndInvoke(Invoker &imp)
00143 {
00144 bool success = false;
00145 {
00146 details::ContinuationInvoker *invoker;
00147 details::container::SingleReaderListIterator<Message> i(m_mailbox);
00148 while (!success && i.hasNext())
00149 {
00150 Message msg = i.next();
00151 invoker = imp.matchingInvokerTo(msg, m_trapExit);
00152 if (invoker)
00153 {
00154 m_lrm = msg;
00155 i.erase();
00156 success = true;
00157 m_continuationInvoker = invoker;
00158 }
00159 }
00160 if (!success)
00161 {
00162
00163 util::ExclusiveLockGuard guard(m_mailboxLock);
00164
00165
00166 while (!success && i.hasNext())
00167 {
00168 Message msg = i.next();
00169 invoker = imp.matchingInvokerTo(msg, m_trapExit);
00170 if (invoker)
00171 {
00172 m_lrm = msg;
00173 i.erase();
00174 success = true;
00175 m_continuationInvoker = invoker;
00176 }
00177 }
00178 if (!success)
00179 {
00180
00181 m_continuationInvoker = NULL;
00182 m_receptionist = &imp;
00183 }
00184 }
00185 }
00186 m_lrmSet = success;
00187 ACEDIA_MEMORY_BARRIER();
00188
00189 if (success)
00190 {
00191
00192 yield(READY);
00193 }
00194 else
00195 {
00196
00197
00198
00199 do
00200 {
00201 yield(BLOCKED);
00202 }
00203 while (NULL == m_continuationInvoker);
00204 }
00205 ACEDIA_MEMORY_BARRIER();
00206
00207 (*const_cast<details::ContinuationInvoker*>(m_continuationInvoker))();
00208 m_continuationInvoker = NULL;
00209 }
00210
00211 bool Actor::setState(ActorState oldState, ActorState nextState)
00212 {
00213 if (nextState == EXITED)
00214 {
00215 bool result = false;
00216 if (atomic::compareAndSwap(&m_state, oldState, EXITED))
00217 {
00218 {
00219
00220 util::ExclusiveLockGuard guard(m_mailboxLock);
00221
00222 m_lrm = Message();
00223
00224 m_mailbox.clear();
00225 }
00226 result = true;
00227
00228 {
00229 MutexLocker locker(m_linksMutex);
00230 Any exitTuple = Exit(m_exitReason);
00231 for (std::list<ActorRef>::iterator i = m_links.begin(); i != m_links.end(); ++i)
00232 {
00233 (*i).send(Message(self(), *i, exitTuple));
00234 }
00235 m_links.clear();
00236 }
00237 ACEDIA_MEMORY_BARRIER();
00238
00239 try { onExit(); }
00240 catch (...) { }
00241 }
00242 return result;
00243 }
00244 else
00245 {
00246 return atomic::compareAndSwap(&m_state, oldState, nextState);
00247 }
00248 }
00249
00250
00251 void Actor::resume(Context *schedulerCtx)
00252 {
00253
00254
00255 m_returnContext = const_cast<volatile Context*>(schedulerCtx);
00256 ACEDIA_MEMORY_BARRIER();
00257 Context::swap(*schedulerCtx, *m_context);
00258 }
00259
00260 void Actor::setLrm(const Message &msg)
00261 {
00262 m_lrm = msg;
00263 m_receptionist = NULL;
00264 m_timeoutReceptionist = NULL;
00265
00266 ActorState as = m_state;
00267 if (as == RUNNING)
00268 {
00269 while ((as = m_state) & RUNNING) ;
00270 }
00271
00272
00273
00274
00275
00276 for (;;)
00277 {
00278 assert(as & (BLOCKED | WAITING | DETACHED_OR_HIDDEN));
00279 if (as & (BLOCKED | WAITING) && setState(as, READY))
00280 {
00281
00282 m_lrmSet = true;
00283
00284
00285 ACEDIA_MEMORY_BARRIER();
00286 details::enqueueToScheduler(this);
00287 return;
00288 }
00289 else if (as & DETACHED_OR_HIDDEN)
00290 {
00291
00292
00293
00294 m_lrmSet = true;
00295 ACEDIA_MEMORY_BARRIER();
00296
00297
00298 m_lrmSetCondition.notify_one();
00299 return;
00300 }
00301 else as = m_state;
00302 }
00303 }
00304
00305 bool Actor::receiveAndInvokeWithin(Invoker &imp, boost::uint16_t msTimeout)
00306 {
00307 bool msgReceived = true;
00308
00309 ActorRef timeEmitter = whereisName("acedia_TimeEmitter");
00310
00311 while (0 == ++m_timeoutRequestId) { }
00312 boost::int32_t reqId = m_timeoutRequestId;
00313 Invoker toutInvoker(on<TimeoutEvent>().unbox().guard(isEq(reqId))
00314 >> boost::function0<void>(boost::bind(setBoolToFalse, &msgReceived)));
00315 m_timeoutReceptionist = &toutInvoker;
00316 send(timeEmitter, TimeoutRequest(reqId, msTimeout));
00317
00318 receiveAndInvoke(imp);
00319 return msgReceived;
00320 }
00321
00322 void Actor::futureMessage(boost::uint32_t msTimeout, const Any& v1)
00323 {
00324 ActorRef timeEmitter = whereisName("acedia_TimeEmitter");
00325 send(timeEmitter, FutureMessageRequest(msTimeout), v1);
00326 }
00327
00328 void Actor::futureMessage(boost::uint32_t msTimeout, const Any& v1, const Any& v2)
00329 {
00330 ActorRef timeEmitter = whereisName("acedia_TimeEmitter");
00331 send(timeEmitter,FutureMessageRequest(msTimeout),v1,v2);
00332 }
00333
00334 void Actor::futureMessage(boost::uint32_t msTimeout, const Any& v1, const Any& v2, const Any& v3)
00335 {
00336 ActorRef timeEmitter = whereisName("acedia_TimeEmitter");
00337 send(timeEmitter,FutureMessageRequest(msTimeout),v1,v2,v3);
00338 }
00339
00340 void Actor::futureMessage(boost::uint32_t msTimeout, const Any& v1, const Any& v2, const Any& v3, const Any& v4)
00341 {
00342 ActorRef timeEmitter = whereisName("acedia_TimeEmitter");
00343 send(timeEmitter,FutureMessageRequest(msTimeout),v1,v2,v3,v4);
00344 }
00345
00346 void Actor::futureMessage(boost::uint32_t msTimeout, const Any& v1, const Any& v2, const Any& v3, const Any& v4, const Any& v5)
00347 {
00348 ActorRef timeEmitter = whereisName("acedia_TimeEmitter");
00349 send(timeEmitter,FutureMessageRequest(msTimeout),v1,v2,v3,v4,v5);
00350 }
00351
00352 void Actor::futureMessage(boost::uint32_t msTimeout, const Any& v1, const Any& v2, const Any& v3, const Any& v4, const Any& v5, const Any& v6)
00353 {
00354 ActorRef timeEmitter = whereisName("acedia_TimeEmitter");
00355 send(timeEmitter,FutureMessageRequest(msTimeout),v1,v2,v3,v4,v5,v6);
00356 }
00357
00358 void Actor::futureMessage(boost::uint32_t msTimeout, const Any& v1, const Any& v2, const Any& v3, const Any& v4, const Any& v5, const Any& v6, const Any& v7)
00359 {
00360 ActorRef timeEmitter = whereisName("acedia_TimeEmitter");
00361 send(timeEmitter,FutureMessageRequest(msTimeout),v1,v2,v3,v4,v5,v6,v7);
00362 }
00363
00364 void Actor::futureMessage(boost::uint32_t msTimeout, const Any& v1, const Any& v2, const Any& v3, const Any& v4, const Any& v5, const Any& v6, const Any& v7, const Any& v8)
00365 {
00366 ActorRef timeEmitter = whereisName("acedia_TimeEmitter");
00367 send(timeEmitter,FutureMessageRequest(msTimeout),v1,v2,v3,v4,v5,v6,v7,v8);
00368 }
00369
00370 bool Actor::filterTimeoutEvents(const Message& msg)
00371 {
00372 if (msg.match<TimeoutEvent>())
00373 {
00374
00375 util::SharedLockGuard mboxGuard(m_mailboxLock);
00376
00377 if (state() != EXITED)
00378 {
00379 if (m_timeoutReceptionist)
00380 {
00381
00382 util::SpinlockGuard recGuard(m_receptionistLock);
00383 Invoker* r = const_cast<Invoker*>(m_timeoutReceptionist);
00384 if (r)
00385 {
00386 details::ContinuationInvoker *i =
00387 r->matchingInvokerTo(msg, m_trapExit);
00388 if (i)
00389 {
00390 m_continuationInvoker = i;
00391 Message lrmAgain = lastReceivedMessage();
00392 setLrm(lrmAgain);
00393 }
00394 }
00395 }
00396 }
00397 return true;
00398 }
00399 return false;
00400 }
00401
00402 bool Actor::filterIncomingMessage(const Message& msg)
00403 {
00404 if (msg.match<AddRemoteLink>())
00405 {
00406
00407 ActorRef who = msg.sender();
00408 if (who != self())
00409 {
00410 MutexLocker locker(m_linksMutex);
00411 if (m_state == EXITED) send(who, Exit(exitReason()));
00412 else m_links.push_back(who);
00413 }
00414 return true;
00415 }
00416 else return filterTimeoutEvents(msg);
00417 }
00418
00419 void Actor::enqueue(const Message &msg)
00420 {
00421 if (!filterIncomingMessage(msg) && state() != EXITED)
00422 {
00423
00424 util::SharedLockGuard mboxGuard(m_mailboxLock);
00425
00426 if (state() != EXITED)
00427 {
00428 if (m_receptionist)
00429 {
00430
00431 util::SpinlockGuard recGuard(m_receptionistLock);
00432 Invoker *r = const_cast<Invoker*>(m_receptionist);
00433 if (r)
00434 {
00435 details::ContinuationInvoker *i =
00436 r->matchingInvokerTo(msg, m_trapExit);
00437 if (i)
00438 {
00439 m_continuationInvoker = i;
00440 setLrm(msg);
00441 }
00442 else
00443 {
00444 m_mailbox.append(msg);
00445 }
00446 }
00447 else
00448 {
00449 m_mailbox.append(msg);
00450 }
00451 }
00452 else
00453 {
00454
00455 m_mailbox.append(msg);
00456 }
00457 }
00458 }
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517
00518
00519
00520 }
00521
00522 void Actor::yield(ActorState mYieldState)
00523 {
00524 if (mYieldState == EXITED) throw ActorKilledException(m_exitReason);
00525 else if (isDetached() && mYieldState == READY)
00526 {
00527
00528
00529 return;
00530 }
00531 *yieldState = mYieldState;
00532 ACEDIA_MEMORY_BARRIER();
00533 Context::swap(*m_context, *const_cast<Context*>(m_returnContext));
00534 ACEDIA_MEMORY_BARRIER();
00535 }
00536
00537 Actor::Actor(ActorState initialState, bool tExit) :
00538 AbstractActor(false, false),
00539 m_state(initialState), m_context(NULL),
00540 m_returnContext(NULL),
00541 m_receptionist(NULL), m_timeoutReceptionist(NULL),
00542 m_trapExit(tExit),
00543 m_lrmSet(false), m_timeoutRequestId(0),
00544 next(NULL), yieldState(NULL)
00545 {
00546 assert(initialState & (UNINITIALIZED | DETACHED_OR_HIDDEN));
00547 if (initialState & DETACHED_OR_HIDDEN)
00548 {
00549 m_state = UNINITIALIZED | initialState;
00550 }
00551 }
00552
00553 void Actor::initialize()
00554 {
00555 #if defined(ACEDIA_UCONTEXT_IMPL)
00556 m_context = new Context((void (*)()) &Actor::ucontext_trampoline,
00557 (int) this);
00558 #elif defined(ACEDIA_FIBER_IMPL)
00559 m_context = new Context((LPFIBER_START_ROUTINE) &Actor::fiber_trampoline,
00560 (LPVOID) this);
00561 #endif
00562 if (m_state == UNINITIALIZED)
00563 {
00564 assert(setState(UNINITIALIZED, READY));
00565 }
00566 else if (m_state == (UNINITIALIZED | DETACHED))
00567 {
00568 assert(setState((UNINITIALIZED | DETACHED), DETACHED));
00569 }
00570 else if (m_state == (UNINITIALIZED | HIDDEN))
00571 {
00572 assert(setState((UNINITIALIZED | HIDDEN), HIDDEN));
00573 }
00574 else
00575 {
00576 assert(false);
00577 }
00578 }
00579
00580 Actor::~Actor()
00581 {
00582 if (m_context) delete m_context;
00583
00584 m_mailbox.clear();
00585 }
00586
00587 void Actor::linkTo(const ActorRef &who)
00588 {
00589 if (who == self()) return;
00590 MutexLocker locker(m_linksMutex);
00591 ActorRef s = self();
00592 if (m_state == EXITED)
00593 {
00594 ActorRef receiver = who;
00595 send(receiver, Exit(exitReason()));
00596 }
00597 else if (who.actor->backlinkTo(s))
00598 {
00599 m_links.push_back(who);
00600 }
00601 }
00602
00603
00604 bool Actor::backlinkTo(ActorRef &who)
00605 {
00606 if (who == self()) return false;
00607 {
00608 MutexLocker locker(m_linksMutex);
00609 if (m_state == EXITED)
00610 {
00611 send(who, Exit(exitReason()));
00612 }
00613 else m_links.push_back(who);
00614 }
00615 return true;
00616 }
00617
00618 void Actor::onExit() { }
00619
00620 void Actor::doExit(boost::int32_t reason)
00621 {
00622 throw ActorKilledException(reason);
00623 }
00624
00625 void Actor::normalExit()
00626 {
00627 doExit(exit_reasons::NORMAL_EXIT);
00628 }
00629
00630 void appStr(String &str, const char *what)
00631 {
00632 if (str.empty()) str += what;
00633 else
00634 {
00635 str += "|";
00636 str += what;
00637 }
00638 }
00639
00640 String Actor::stateAsString() const
00641 {
00642 String str;
00643 ActorState s = m_state;
00644 if (s & UNINITIALIZED) appStr(str, "UNINITIALIZED");
00645 if (s & READY) appStr(str, "READY");
00646 if (s & BLOCKED) appStr(str, "BLOCKED");
00647 if (s & WAITING) appStr(str, "WAITING");
00648 if (s & RUNNING) appStr(str, "RUNNING");
00649 if (s & DETACHED) appStr(str, "DETACHED");
00650 if (s & HIDDEN) appStr(str, "HIDDEN");
00651 if (s & EXITED) appStr(str, "EXITED");
00652 return str;
00653 }
00654
00655 }