00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #define ACEDIA_REDUCED_ANNOUNCE
00031
00032 #ifndef _XOPEN_SOURCE
00033 # define _XOPEN_SOURCE
00034 #endif
00035
00036
00037 #include <list>
00038 #include <sstream>
00039 #include <set>
00040
00041
00042 #include <boost/thread.hpp>
00043 #include <boost/cstdint.hpp>
00044 #include <boost/thread/mutex.hpp>
00045 #include <boost/thread/condition_variable.hpp>
00046
00047
00048 #include "actor.hpp"
00049 #include "acedia.hpp"
00050 #include "acedia_atomic.hpp"
00051 #include "acedia_util.hpp"
00052
00053 #include <iostream>
00054 using std::cout;
00055 using std::endl;
00056
00057 namespace acedia
00058 {
00059 namespace details
00060 {
00061 class InvokeVoidFunctor : public AnyArrayProcessor
00062 {
00063
00064 boost::function0<void> f;
00065
00066 public:
00067
00068 InvokeVoidFunctor(const boost::function0<void> &func) : f(func) { }
00069
00070 virtual bool operator()(const AnyArray &)
00071 {
00072 f();
00073 return true;
00074 }
00075
00076 virtual ContinuationInvoker *matchingInvoker(const AnyArray &arr)
00077 {
00078 return new ContinuationInvoker(this, &arr);
00079 }
00080
00081 };
00082
00083 }
00084
00085 void discard() { }
00086
00087 details::AnyArrayProcessor* MatchAllRuleBuilder::operator>>(void (*voidFunction)())
00088 {
00089 return ((*this) >> boost::function0<void>(voidFunction));
00090 }
00091
00092 details::AnyArrayProcessor* MatchAllRuleBuilder::operator>>(const boost::function0<void> &f)
00093 {
00094 return new details::InvokeVoidFunctor(f);
00095 }
00096
00097 static boost::int32_t schedulerStartetFlag = 0;
00098
00099 static boost::int32_t runningActors = 0;
00100
00101 static boost::condition_variable noRunningActorsCv;
00102
00103 inline void incRunningActors()
00104 {
00105 (void) atomic::addAndFetch(&runningActors, 1);
00106 }
00107
00108 inline void decRunningActors()
00109 {
00110 if (0 == atomic::addAndFetch(&runningActors, -1))
00111 noRunningActorsCv.notify_all();
00112 }
00113
00114
00115
00116
00117
00118 struct TimeEmitter : public Actor
00119 {
00120
00121
00122 typedef std::multimap<boost::system_time ,
00123 Message >
00124 TimeoutMap;
00125 typedef TimeoutMap::value_type TimeoutMapEntry;
00126
00127 TimeoutMap timeouts;
00128
00129 TimeEmitter() : Actor(HIDDEN), timeouts() { }
00130
00131 void handleRequest(ActorRef requester, boost::int32_t reqId, boost::uint32_t msTimeout)
00132 {
00133 assert(requester.isValid());
00134 boost::system_time tOut = boost::get_system_time();
00135 tOut += boost::posix_time::milliseconds(msTimeout);
00136 timeouts.insert(TimeoutMapEntry(tOut,
00137 Message(requester, requester,
00138 TimeoutEvent(reqId))));
00139
00140 }
00141
00142 bool timeoutHappens()
00143 {
00144 boost::system_time st = boost::get_system_time();
00145 TimeoutMap::iterator it = timeouts.begin();
00146 if (it != timeouts.end() && (it->first) < st)
00147 {
00148
00149
00150 Message delivery = it->second;
00151 ActorRef whom = delivery.receiver();
00152 whom.send(delivery);
00153
00154 timeouts.erase(it);
00155 return true;
00156 }
00157 return false;
00158 }
00159
00160 Message createFutureMessage(const Message& from)
00161 {
00162 ActorRef s = from.sender();
00163
00164
00165 switch (from.length())
00166 {
00167 case 2: return Message(s,s,from.at(1));
00168 case 3: return Message(s,s,from.at(1),from.at(2));
00169 case 4: return Message(s,s,from.at(1),from.at(2),from.at(3));
00170 case 5: return Message(s,s,from.at(1),from.at(2),from.at(3),from.at(4));
00171 case 6: return Message(s,s,from.at(1),from.at(2),from.at(3),from.at(4),from.at(5));
00172 case 7: return Message(s,s,from.at(1),from.at(2),from.at(3),from.at(4),from.at(5),from.at(6));
00173 case 8: return Message(s,s,from.at(1),from.at(2),from.at(3),from.at(4),from.at(5),from.at(6),from.at(7));
00174 case 9: return Message(s,s,from.at(1),from.at(2),from.at(3),from.at(4),from.at(5),from.at(6),from.at(7),from.at(8));
00175 default: return Message(s,s);
00176 }
00177 }
00178
00179 virtual void act()
00180 {
00181 Message msg;
00182 MetaClass* timeoutRequestMetaClass =
00183 getMetaClass("acedia::TimeoutRequest");
00184 MetaClass* futureMessageRequestMetaClass =
00185 getMetaClass("acedia::FutureMessageRequest");
00186 assert(NULL != timeoutRequestMetaClass);
00187 for (;;)
00188 {
00189 while (tryReceive(msg))
00190 {
00191 if (msg.metaClassAt(0) == timeoutRequestMetaClass)
00192 {
00193 const TimeoutRequest& tr =
00194 msg.uncheckedValueAt<TimeoutRequest>(0);
00195 handleRequest(msg.sender(), tr.requestId(), tr.msTimeout());
00196 }
00197 else if (msg.metaClassAt(0) == futureMessageRequestMetaClass)
00198 {
00199 if (msg.sender().isValid())
00200 {
00201 const FutureMessageRequest& req =
00202 msg.uncheckedValueAt<FutureMessageRequest>(0);
00203 boost::uint32_t msTimeout = req.msTimeout();
00204 boost::system_time tOut = boost::get_system_time();
00205 tOut += boost::posix_time::milliseconds(msTimeout);
00206 timeouts.insert(TimeoutMapEntry(tOut,
00207 createFutureMessage(msg)));
00208 }
00209 }
00210 }
00211
00212 while (timeoutHappens())
00213 {
00214
00215 }
00216
00217 boost::system_time st = boost::get_system_time();
00218 st += boost::posix_time::milliseconds(1);
00219 boost::this_thread::sleep(st);
00220 }
00221 }
00222
00223 };
00224
00225
00226
00227
00228
00229
00230 struct Worker;
00231
00232 struct SchedulerData
00233 {
00234 Worker *worker;
00235 boost::uint32_t lastReadedTStamp;
00236 boost::system_time timeout;
00237 inline SchedulerData() throw() :
00238 worker(NULL), lastReadedTStamp(UInt32Maximum),
00239 timeout(boost::get_system_time())
00240 {
00241 }
00242 ~SchedulerData();
00243 };
00244
00245 struct Scheduler
00246 {
00247
00248 volatile bool run;
00249 volatile bool exited;
00250
00251 details::container::intrusive::SingleReaderQueue<Actor> jobQ;
00252 util::Semaphore jobsQSem;
00253 util::Spinlock jobQLock;
00254
00255 Scheduler() : run(true), exited(false) { }
00256
00257
00258 Actor *getJob()
00259 {
00260
00261 if (!run) return NULL;
00262 while (!jobsQSem.tryAcquire(1, 30))
00263 {
00264 if (!run) return NULL;
00265 }
00266
00267 {
00268 util::SpinlockGuard guard(jobQLock);
00269 return jobQ.tryDequeue();
00270 }
00271 }
00272
00273
00274 Actor *tryGetJob()
00275 {
00276
00277 if (jobsQSem.tryAcquire())
00278 {
00279 util::SpinlockGuard guard(jobQLock);
00280 return jobQ.tryDequeue();
00281 }
00282 return NULL;
00283 }
00284
00285 void enqueue(Actor *a)
00286 {
00287
00288 jobQ.enqueue(a);
00289 jobsQSem.release();
00290 }
00291
00292
00293 void operator()();
00294
00295 } scheduler;
00296
00297 struct Worker : public ReferenceCounted
00298 {
00299 volatile boost::uint32_t timestamp;
00300 volatile bool detached;
00301 volatile bool quit;
00302 volatile bool isRunning;
00303 volatile bool exited;
00304 boost::mutex mm;
00305
00306 Actor *a;
00307
00308 Worker() : ReferenceCounted(1), timestamp(0), detached(false),
00309 quit(false), isRunning(false), exited(false), a(NULL)
00310 {
00311 }
00312
00313 Worker(Actor *detachedActor) : ReferenceCounted(1), timestamp(0),
00314 detached(true), quit(false), isRunning(false), exited(false),
00315 a(NULL)
00316 {
00317 assert(detachedActor->isDetached());
00318 a = detachedActor;
00319 }
00320
00321 void waitUntilDetachedActorBecomesReady()
00322 {
00323 boost::unique_lock<boost::mutex> lock(mm);
00324 while (!a->m_lrmSet)
00325 {
00326
00327
00328 boost::system_time timeout = boost::get_system_time();
00329 timeout += boost::posix_time::milliseconds(25);
00330 a->m_lrmSetCondition.timed_wait(lock, timeout);
00331 }
00332 }
00333
00334 void runDetached()
00335 {
00336 Context ctx;
00337 assert(NULL != a && a->isDetached());
00338
00339 if (a->state() & UNINITIALIZED) a->initialize();
00340
00341
00342 volatile ActorState nextState;
00343 a->yieldState = &nextState;
00344 while (!quit)
00345 {
00346 ACEDIA_MEMORY_BARRIER();
00347 a->resume(&ctx);
00348 ACEDIA_MEMORY_BARRIER();
00349 switch (nextState)
00350 {
00351 case BLOCKED:
00352 waitUntilDetachedActorBecomesReady();
00353 break;
00354
00355 case READY: break;
00356
00357 case EXITED:
00358
00359
00360 timestamp = timestamp + 1;
00361 if (a->state() == DETACHED)
00362 {
00363 assert(a->setState(DETACHED, EXITED));
00364
00365
00366 ACEDIA_MEMORY_BARRIER();
00367 decRunningActors();
00368 }
00369
00370 else assert(a->setState(HIDDEN, EXITED));
00371
00372 if (!a->deref()) delete a;
00373 a = NULL;
00374 quit = true;
00375 break;
00376
00377 default: throw 666;
00378 }
00379 }
00380 }
00381
00382 void operator()()
00383 {
00384 if (detached)
00385 {
00386 runDetached();
00387 exited = true;
00388 return;
00389 }
00390 ActorState nextState;
00391 Context ctx;
00392 while (!quit)
00393 {
00394 isRunning = false;
00395 ACEDIA_MEMORY_BARRIER();
00396 if (a)
00397 {
00398 Actor *nextJob = scheduler.tryGetJob();
00399
00400 if (nextJob)
00401 {
00402
00403 scheduler.enqueue(a);
00404
00405 a = nextJob;
00406 }
00407
00408 }
00409 else
00410 {
00411
00412 a = scheduler.getJob();
00413 if (!a)
00414 {
00415 quit = true;
00416 ACEDIA_MEMORY_BARRIER();
00417 exited = true;
00418 return;
00419 }
00420 }
00421 timestamp = timestamp + 1;
00422
00423
00424 ACEDIA_MEMORY_BARRIER();
00425 isRunning = true;
00426 if (a->state() & UNINITIALIZED) a->initialize();
00427 assert(!a->isDetached());
00428 a->yieldState = &nextState;
00429 assert(a->setState(READY, RUNNING));
00430 ACEDIA_MEMORY_BARRIER();
00431 a->resume(&ctx);
00432 ACEDIA_MEMORY_BARRIER();
00433
00434 if (detached)
00435 {
00436 if (nextState == EXITED)
00437 {
00438 assert(a->setState(RUNNING, EXITED));
00439 if (!a->deref()) delete a;
00440 a = NULL;
00441 }
00442 else
00443 {
00444
00445 waitUntilDetachedActorBecomesReady();
00446 runDetached();
00447 }
00448 decRunningActors();
00449 exited = true;
00450 return;
00451 }
00452 else switch(nextState)
00453 {
00454 case WAITING:
00455 case BLOCKED:
00456 assert(a->setState(RUNNING, nextState));
00457 a = NULL;
00458 break;
00459
00460 case READY:
00461 assert(a->setState(RUNNING, READY));
00462 break;
00463
00464 case EXITED:
00465
00466
00467 timestamp = timestamp + 1;
00468 ACEDIA_MEMORY_BARRIER();
00469 assert(a->setState(RUNNING, EXITED));
00470 ACEDIA_MEMORY_BARRIER();
00471 if (!a->deref()) delete a;
00472 a = NULL;
00473 decRunningActors();
00474 break;
00475
00476 default: throw 666;
00477 }
00478 }
00479 exited = true;
00480 }
00481
00482 };
00483
00484 struct WorkerRunner
00485 {
00486 Worker *w;
00487 WorkerRunner(Worker *worker) : w(worker) { if(w) w->ref(); }
00488 ~WorkerRunner()
00489 {
00490 if (w && !w->deref()) delete w;
00491 }
00492 WorkerRunner(const WorkerRunner &other) : w(other.w)
00493 {
00494 if (w) w->ref();
00495 }
00496 inline void operator()()
00497 {
00498 if (w) (*w)();
00499 }
00500 };
00501
00502 SchedulerData::~SchedulerData()
00503 {
00504 if (worker && !worker->deref()) delete worker;
00505 }
00506
00507 void Scheduler::operator()()
00508 {
00509 boost::uint32_t hc = boost::thread::hardware_concurrency();
00510 SchedulerData *data = new SchedulerData[hc];
00511 for (boost::uint32_t i = 0; i < hc; ++i)
00512 {
00513 Worker *w = new Worker;
00514 data[i].worker = w;
00515 boost::thread(WorkerRunner(w)).detach();
00516 }
00517 while (run)
00518 {
00519 for (boost::uint32_t i = 0; i < hc; ++i)
00520 {
00521 if (data[i].worker->isRunning)
00522 {
00523 SchedulerData &d = data[i];
00524 boost::uint32_t ts = d.worker->timestamp;
00525 boost::system_time st = boost::get_system_time();
00526 if (ts != d.lastReadedTStamp)
00527 {
00528 d.lastReadedTStamp = ts;
00529 d.timeout = st;
00530
00531 d.timeout += boost::posix_time::milliseconds(100);
00532 }
00533 else
00534 {
00535
00536 if (st >= d.timeout)
00537 {
00538
00539 d.worker->detached = true;
00540
00541 d.lastReadedTStamp = UInt32Maximum;
00542 d.worker = new Worker;
00543 boost::thread(WorkerRunner(d.worker)).detach();
00544 }
00545 }
00546 }
00547 }
00548
00549 boost::system_time st = boost::get_system_time();
00550 st += boost::posix_time::milliseconds(1);
00551 boost::this_thread::sleep(st);
00552 }
00553 for (boost::uint32_t i = 0; i < hc; ++i)
00554 {
00555 data[i].worker->quit = true;
00556 ACEDIA_MEMORY_BARRIER();
00557
00558 while (!data[i].worker->exited) boost::this_thread::yield();
00559 }
00560 delete[] data;
00561 ACEDIA_MEMORY_BARRIER();
00562 exited = true;
00563 }
00564
00565 namespace details
00566 {
00567 void enqueueToScheduler(Actor *a) { scheduler.enqueue(a); }
00568 }
00569
00570 void waitForAllActorsDone()
00571 {
00572 static boost::mutex mut;
00573 boost::unique_lock<boost::mutex> lock(mut);
00574 while (0 != runningActors)
00575 {
00576
00577 boost::system_time timeout = boost::get_system_time();
00578 timeout += boost::posix_time::milliseconds(50);
00579 (void) noRunningActorsCv.timed_wait(lock, timeout);
00580 }
00581 }
00582
00583 void shutdown()
00584 {
00585 scheduler.run = false;
00586 while (!scheduler.exited) boost::this_thread::yield();
00587 #ifdef ACEDIA_WINDOWS
00588 WSACleanup();
00589 #endif
00590 }
00591
00592 namespace details
00593 {
00594 ActorRef doSpawn(Actor *a)
00595 {
00596
00597
00598 a->ref();
00599 if (a->isDetached())
00600 {
00601
00602
00603 Worker *w = new Worker(a);
00604 boost::thread(WorkerRunner(w)).detach();
00605 }
00606 else
00607 {
00608 if (0 == schedulerStartetFlag
00609 && 1 == atomic::addAndFetch(&schedulerStartetFlag, 1))
00610 {
00611 #ifdef ACEDIA_WINDOWS
00612 WSADATA wsa;
00613 if (0 != WSAStartup(MAKEWORD(2,0), &wsa))
00614 {
00615 ACEDIA_THROW(WSAStartupFailedException, "WSAStartup(MAKEWORD(2,0), &wsa) failed");
00616 }
00617 #endif
00618 boost::thread(boost::ref(scheduler)).detach();
00619 }
00620 ACEDIA_MEMORY_BARRIER();
00621 scheduler.enqueue(a);
00622 }
00623 if (!(a->state() & HIDDEN)) incRunningActors();
00624 return a->self();
00625 }
00626 }
00627
00628 }
00629
00630
00631
00632
00633
00634
00635 static acedia::String m_application_name = "";
00636 static acedia::String m_application_id = "(Version 0.0)";
00637
00638 static boost::uint32_t m_app_version_settings[4] = {0, 0, 0, 0};
00639
00640 static bool m_allowNewerMinorVersion = false;
00641 static bool m_allowNewerMajorVersion = false;
00642
00643 static std::map<acedia::String, acedia::ActorRef> m_namesMap;
00644 static boost::mutex m_namesMutex;
00645
00646 namespace acedia
00647 {
00648 bool registerName(const String &name, const ActorRef &actor)
00649 {
00650 boost::mutex::scoped_lock lock(m_namesMutex);
00651 ActorRef &ar = m_namesMap[name];
00652 ar = actor;
00653 return true;
00654 }
00655
00656 ActorRef whereisName(const String &name)
00657 {
00658 boost::mutex::scoped_lock lock(m_namesMutex);
00659 std::map<String, ActorRef>::const_iterator ci = m_namesMap.find(name);
00660 if (ci != m_namesMap.end())
00661 {
00662 return ci->second;
00663 }
00664 else if (name == "acedia_TimeEmitter")
00665 {
00666 ActorRef emitter = spawn<TimeEmitter>();
00667 m_namesMap.insert(std::map<String, ActorRef>::value_type(String("acedia_TimeEmitter"), emitter));
00668 return emitter;
00669 }
00670 else
00671 {
00672 return ActorRef();
00673 }
00674 }
00675
00676 void setAppInfo(const char *appName,
00677 boost::uint32_t majorVersion,
00678 boost::uint32_t minorVersion,
00679 boost::uint32_t minCompatibleMajorVersion,
00680 boost::uint32_t minCompatibleMinorVersion,
00681 bool allowNewerMinorVersion,
00682 bool allowNewerMajorVersion)
00683 {
00684 m_application_name = appName;
00685 m_app_version_settings[0] = majorVersion;
00686 m_app_version_settings[1] = minorVersion;
00687 m_app_version_settings[2] = minCompatibleMajorVersion;
00688 m_app_version_settings[3] = minCompatibleMinorVersion;
00689 m_allowNewerMajorVersion = allowNewerMajorVersion;
00690 m_allowNewerMinorVersion = allowNewerMinorVersion;
00691 std::ostringstream os;
00692 os << appName << " (Version " << majorVersion << "."
00693 << minorVersion << ")";
00694 m_application_id = os.str();
00695 }
00696 namespace details
00697 {
00698
00699 const String &appIdentifier()
00700 {
00701 return m_application_id;
00702 }
00703 const String &appName() { return m_application_name; }
00704 boost::uint32_t appMajorVersion()
00705 {
00706 return m_app_version_settings[0];
00707 }
00708 boost::uint32_t appMinorVersion()
00709 {
00710 return m_app_version_settings[1];
00711 }
00712
00713
00714 bool isCompatible(const String& appn,
00715 boost::uint32_t major,
00716 boost::uint32_t minor)
00717 {
00718 return appn == appName()
00719 && major >= m_app_version_settings[2]
00720 && minor >= m_app_version_settings[3]
00721 && ((m_allowNewerMajorVersion && major > appMajorVersion()) || major <= appMajorVersion())
00722 && ((m_allowNewerMinorVersion && minor > appMinorVersion()) || minor <= appMinorVersion());
00723 }
00724 }
00725 void link(ActorRef actor1, ActorRef actor2)
00726 {
00727 actor1.actor->linkTo(actor2);
00728 }
00729 }