00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #define ACEDIA_REDUCED_ANNOUNCE
00031 
00032 #ifndef _XOPEN_SOURCE
00033 #  define _XOPEN_SOURCE
00034 #endif
00035 
00036 
00037 #include <list>
00038 #include <sstream>
00039 #include <set>
00040 
00041 
00042 #include <boost/thread.hpp>
00043 #include <boost/cstdint.hpp>
00044 #include <boost/thread/mutex.hpp>
00045 #include <boost/thread/condition_variable.hpp>
00046 
00047 
00048 #include "actor.hpp"
00049 #include "acedia.hpp"
00050 #include "acedia_atomic.hpp"
00051 #include "acedia_util.hpp"
00052 
00053 #include <iostream>
00054 using std::cout;
00055 using std::endl;
00056 
00057 namespace acedia
00058 {
00059     namespace details
00060     {
00061         class InvokeVoidFunctor : public AnyArrayProcessor
00062         {
00063 
00064             boost::function0<void> f;
00065 
00066         public:
00067 
00068             InvokeVoidFunctor(const boost::function0<void> &func) : f(func) { }
00069 
00070             virtual bool operator()(const AnyArray &)
00071             {
00072                 f();
00073                 return true;
00074             }
00075 
00076             virtual ContinuationInvoker *matchingInvoker(const AnyArray &arr)
00077             {
00078                 return new ContinuationInvoker(this, &arr);
00079             }
00080 
00081         };
00082 
00083     }
00084 
00085     void discard() { }
00086 
00087     details::AnyArrayProcessor* MatchAllRuleBuilder::operator>>(void (*voidFunction)())
00088     {
00089         return ((*this) >> boost::function0<void>(voidFunction));
00090     }
00091 
00092     details::AnyArrayProcessor* MatchAllRuleBuilder::operator>>(const boost::function0<void> &f)
00093     {
00094         return new details::InvokeVoidFunctor(f);
00095     }
00096 
00097     static boost::int32_t schedulerStartetFlag = 0;
00098 
00099     static boost::int32_t runningActors = 0;
00100 
00101     static boost::condition_variable noRunningActorsCv;
00102 
00103     inline void incRunningActors()
00104     {
00105         (void) atomic::addAndFetch(&runningActors, 1);
00106     }
00107 
00108     inline void decRunningActors()
00109     {
00110         if (0 == atomic::addAndFetch(&runningActors, -1))
00111             noRunningActorsCv.notify_all();
00112     }
00113 
00114     
00115 
00116 
00117 
00118     struct TimeEmitter : public Actor
00119     {
00120 
00121 
00122         typedef std::multimap<boost::system_time ,
00123                               Message >
00124                 TimeoutMap;
00125         typedef TimeoutMap::value_type TimeoutMapEntry;
00126 
00127         TimeoutMap timeouts;
00128 
00129         TimeEmitter() : Actor(HIDDEN), timeouts() { }
00130 
00131         void handleRequest(ActorRef requester, boost::int32_t reqId, boost::uint32_t msTimeout)
00132         {
00133             assert(requester.isValid());
00134             boost::system_time tOut = boost::get_system_time();
00135             tOut += boost::posix_time::milliseconds(msTimeout);
00136             timeouts.insert(TimeoutMapEntry(tOut,
00137                                             Message(requester, requester,
00138                                                     TimeoutEvent(reqId))));
00139 
00140         }
00141 
00142         bool timeoutHappens()
00143         {
00144             boost::system_time st = boost::get_system_time();
00145             TimeoutMap::iterator it = timeouts.begin();
00146             if (it != timeouts.end() && (it->first) < st)
00147             {
00148 
00149 
00150                 Message delivery = it->second;
00151                 ActorRef whom = delivery.receiver();
00152                 whom.send(delivery);
00153 
00154                 timeouts.erase(it);
00155                 return true;
00156             }
00157             return false;
00158         }
00159 
00160         Message createFutureMessage(const Message& from)
00161         {
00162             ActorRef s = from.sender();
00163             
00164             
00165             switch (from.length())
00166             {
00167              case  2: return Message(s,s,from.at(1));
00168              case  3: return Message(s,s,from.at(1),from.at(2));
00169              case  4: return Message(s,s,from.at(1),from.at(2),from.at(3));
00170              case  5: return Message(s,s,from.at(1),from.at(2),from.at(3),from.at(4));
00171              case  6: return Message(s,s,from.at(1),from.at(2),from.at(3),from.at(4),from.at(5));
00172              case  7: return Message(s,s,from.at(1),from.at(2),from.at(3),from.at(4),from.at(5),from.at(6));
00173              case  8: return Message(s,s,from.at(1),from.at(2),from.at(3),from.at(4),from.at(5),from.at(6),from.at(7));
00174              case  9: return Message(s,s,from.at(1),from.at(2),from.at(3),from.at(4),from.at(5),from.at(6),from.at(7),from.at(8));
00175              default: return Message(s,s);
00176             }
00177         }
00178 
00179         virtual void act()
00180         {
00181             Message msg;
00182             MetaClass* timeoutRequestMetaClass =
00183                     getMetaClass("acedia::TimeoutRequest");
00184             MetaClass* futureMessageRequestMetaClass =
00185                     getMetaClass("acedia::FutureMessageRequest");
00186             assert(NULL != timeoutRequestMetaClass);
00187             for (;;)
00188             {
00189                 while (tryReceive(msg))
00190                 {
00191                     if (msg.metaClassAt(0) == timeoutRequestMetaClass)
00192                     {
00193                         const TimeoutRequest& tr =
00194                                 msg.uncheckedValueAt<TimeoutRequest>(0);
00195                         handleRequest(msg.sender(), tr.requestId(), tr.msTimeout());
00196                     }
00197                     else if (msg.metaClassAt(0) == futureMessageRequestMetaClass)
00198                     {
00199                         if (msg.sender().isValid())
00200                         {
00201                             const FutureMessageRequest& req =
00202                                     msg.uncheckedValueAt<FutureMessageRequest>(0);
00203                             boost::uint32_t msTimeout = req.msTimeout();
00204                             boost::system_time tOut = boost::get_system_time();
00205                             tOut += boost::posix_time::milliseconds(msTimeout);
00206                             timeouts.insert(TimeoutMapEntry(tOut,
00207                                                             createFutureMessage(msg)));
00208                         }
00209                     }
00210                 }
00211                 
00212                 while (timeoutHappens())
00213                 {
00214                     
00215                 }
00216                 
00217                 boost::system_time st = boost::get_system_time();
00218                 st += boost::posix_time::milliseconds(1);
00219                 boost::this_thread::sleep(st);
00220             }
00221         }
00222 
00223     };
00224 
00225     
00226 
00227 
00228 
00229     
00230     struct Worker;
00231 
00232     struct SchedulerData
00233     {
00234         Worker *worker;
00235         boost::uint32_t lastReadedTStamp;
00236         boost::system_time timeout;
00237         inline SchedulerData() throw() :
00238                 worker(NULL), lastReadedTStamp(UInt32Maximum),
00239                 timeout(boost::get_system_time())
00240         {
00241         }
00242         ~SchedulerData();
00243     };
00244 
00245     struct Scheduler
00246     {
00247 
00248         volatile bool run;
00249         volatile bool exited;
00250 
00251         details::container::intrusive::SingleReaderQueue<Actor> jobQ;
00252         util::Semaphore jobsQSem;
00253         util::Spinlock jobQLock;
00254 
00255         Scheduler() : run(true), exited(false) { }
00256 
00257         
00258         Actor *getJob()
00259         {
00260             
00261             if (!run) return NULL;
00262             while (!jobsQSem.tryAcquire(1, 30))
00263             {
00264                 if (!run) return NULL;
00265             }
00266             
00267             {
00268                 util::SpinlockGuard guard(jobQLock);
00269                 return jobQ.tryDequeue();
00270             }
00271         }
00272 
00273         
00274         Actor *tryGetJob()
00275         {
00276 
00277             if (jobsQSem.tryAcquire())
00278             {
00279                 util::SpinlockGuard guard(jobQLock);
00280                 return jobQ.tryDequeue();
00281             }
00282             return NULL;
00283         }
00284 
00285         void enqueue(Actor *a)
00286         {
00287 
00288             jobQ.enqueue(a);
00289             jobsQSem.release();
00290         }
00291 
00292         
00293         void operator()();
00294 
00295     } scheduler;
00296 
00297     struct Worker : public ReferenceCounted
00298     {
00299         volatile boost::uint32_t timestamp;
00300         volatile bool detached;
00301         volatile bool quit;
00302         volatile bool isRunning;
00303         volatile bool exited;
00304         boost::mutex mm; 
00305 
00306         Actor *a;
00307 
00308         Worker() : ReferenceCounted(1), timestamp(0), detached(false),
00309                 quit(false), isRunning(false), exited(false), a(NULL)
00310         {
00311         }
00312 
00313         Worker(Actor *detachedActor) : ReferenceCounted(1), timestamp(0),
00314                 detached(true), quit(false), isRunning(false), exited(false),
00315                 a(NULL)
00316         {
00317             assert(detachedActor->isDetached());
00318             a = detachedActor;
00319         }
00320 
00321         void waitUntilDetachedActorBecomesReady()
00322         {
00323             boost::unique_lock<boost::mutex> lock(mm);
00324             while (!a->m_lrmSet)
00325             {
00326                 
00327                 
00328                 boost::system_time timeout = boost::get_system_time();
00329                 timeout += boost::posix_time::milliseconds(25);
00330                 a->m_lrmSetCondition.timed_wait(lock, timeout);
00331             }
00332         }
00333 
00334         void runDetached()
00335         {
00336             Context ctx;
00337             assert(NULL != a && a->isDetached());
00338             
00339             if (a->state() & UNINITIALIZED) a->initialize();
00340             
00341             
00342             volatile ActorState nextState;
00343             a->yieldState = &nextState;
00344             while (!quit)
00345             {
00346                 ACEDIA_MEMORY_BARRIER();
00347                 a->resume(&ctx);
00348                 ACEDIA_MEMORY_BARRIER();
00349                 switch (nextState)
00350                 {
00351                     case BLOCKED:
00352                         waitUntilDetachedActorBecomesReady();
00353                         break;
00354 
00355                     case READY: break; 
00356 
00357                     case EXITED:
00358                         
00359                         
00360                         timestamp = timestamp + 1;
00361                         if (a->state() == DETACHED)
00362                         {
00363                             assert(a->setState(DETACHED, EXITED));
00364                             
00365                             
00366                             ACEDIA_MEMORY_BARRIER();
00367                             decRunningActors();
00368                         }
00369                         
00370                         else assert(a->setState(HIDDEN, EXITED));
00371                         
00372                         if (!a->deref()) delete a;
00373                         a = NULL;
00374                         quit = true; 
00375                         break;
00376 
00377                     default: throw 666; 
00378                 }
00379             }
00380         }
00381 
00382         void operator()()
00383         {
00384             if (detached)
00385             {
00386                 runDetached();
00387                 exited = true;
00388                 return;
00389             }
00390             ActorState nextState;
00391             Context ctx;
00392             while (!quit)
00393             {
00394                 isRunning = false;
00395                 ACEDIA_MEMORY_BARRIER();
00396                 if (a)
00397                 {
00398                     Actor *nextJob = scheduler.tryGetJob();
00399                     
00400                     if (nextJob)
00401                     {
00402                         
00403                         scheduler.enqueue(a);
00404                         
00405                         a = nextJob;
00406                     }
00407                     
00408                 }
00409                 else
00410                 {
00411                     
00412                     a = scheduler.getJob();
00413                     if (!a)
00414                     {
00415                         quit = true;
00416                         ACEDIA_MEMORY_BARRIER();
00417                         exited = true;
00418                         return;
00419                     }
00420                 }
00421                 timestamp = timestamp + 1;
00422                 
00423                 
00424                 ACEDIA_MEMORY_BARRIER();
00425                 isRunning = true;
00426                 if (a->state() & UNINITIALIZED) a->initialize();
00427                 assert(!a->isDetached());
00428                 a->yieldState = &nextState;
00429                 assert(a->setState(READY, RUNNING));
00430                 ACEDIA_MEMORY_BARRIER();
00431                 a->resume(&ctx);
00432                 ACEDIA_MEMORY_BARRIER();
00433                 
00434                 if (detached)
00435                 {
00436                     if (nextState == EXITED)
00437                     {
00438                         assert(a->setState(RUNNING, EXITED));
00439                         if (!a->deref()) delete a;
00440                         a = NULL;
00441                     }
00442                     else
00443                     {
00444                         
00445                         waitUntilDetachedActorBecomesReady();
00446                         runDetached();
00447                     }
00448                     decRunningActors();
00449                     exited = true;
00450                     return; 
00451                 }
00452                 else switch(nextState)
00453                 {
00454                     case WAITING:
00455                     case BLOCKED:
00456                         assert(a->setState(RUNNING, nextState));
00457                         a = NULL; 
00458                         break;
00459 
00460                     case READY:
00461                         assert(a->setState(RUNNING, READY));
00462                         break;
00463 
00464                     case EXITED:
00465                         
00466                         
00467                         timestamp = timestamp + 1;
00468                         ACEDIA_MEMORY_BARRIER();
00469                         assert(a->setState(RUNNING, EXITED));
00470                         ACEDIA_MEMORY_BARRIER();
00471                         if (!a->deref()) delete a;
00472                         a = NULL;
00473                         decRunningActors();
00474                         break;
00475 
00476                     default: throw 666; 
00477                 }
00478             }
00479             exited = true;
00480         }
00481 
00482     };
00483 
00484     struct WorkerRunner
00485     {
00486         Worker *w;
00487         WorkerRunner(Worker *worker) : w(worker) { if(w) w->ref(); }
00488         ~WorkerRunner()
00489         {
00490             if (w && !w->deref()) delete w;
00491         }
00492         WorkerRunner(const WorkerRunner &other) : w(other.w)
00493         {
00494             if (w) w->ref();
00495         }
00496         inline void operator()()
00497         {
00498             if (w) (*w)();
00499         }
00500     };
00501 
00502     SchedulerData::~SchedulerData()
00503     {
00504         if (worker && !worker->deref()) delete worker;
00505     }
00506 
00507     void Scheduler::operator()()
00508     {
00509         boost::uint32_t hc = boost::thread::hardware_concurrency();
00510         SchedulerData *data = new SchedulerData[hc];
00511         for (boost::uint32_t i = 0; i < hc; ++i)
00512         {
00513             Worker *w = new Worker;
00514             data[i].worker = w;
00515             boost::thread(WorkerRunner(w)).detach();
00516         }
00517         while (run)
00518         {
00519             for (boost::uint32_t i = 0; i < hc; ++i)
00520             {
00521                 if (data[i].worker->isRunning)
00522                 {
00523                     SchedulerData &d = data[i];
00524                     boost::uint32_t ts = d.worker->timestamp;
00525                     boost::system_time st = boost::get_system_time();
00526                     if (ts != d.lastReadedTStamp)
00527                     {
00528                         d.lastReadedTStamp = ts;
00529                         d.timeout = st;
00530                         
00531                         d.timeout += boost::posix_time::milliseconds(100);
00532                     }
00533                     else
00534                     {
00535                         
00536                         if (st >= d.timeout)
00537                         {
00538                             
00539                             d.worker->detached = true;
00540                             
00541                             d.lastReadedTStamp = UInt32Maximum;
00542                             d.worker = new Worker;
00543                             boost::thread(WorkerRunner(d.worker)).detach();
00544                         }
00545                     }
00546                 }
00547             }
00548             
00549             boost::system_time st = boost::get_system_time();
00550             st += boost::posix_time::milliseconds(1);
00551             boost::this_thread::sleep(st);
00552         }
00553         for (boost::uint32_t i = 0; i < hc; ++i)
00554         {
00555             data[i].worker->quit = true;
00556             ACEDIA_MEMORY_BARRIER();
00557             
00558             while (!data[i].worker->exited) boost::this_thread::yield();
00559         }
00560         delete[] data;
00561         ACEDIA_MEMORY_BARRIER();
00562         exited = true;
00563     }
00564 
00565     namespace details
00566     {
00567         void enqueueToScheduler(Actor *a) { scheduler.enqueue(a); }
00568     }
00569 
00570     void waitForAllActorsDone()
00571     {
00572         static boost::mutex mut;
00573         boost::unique_lock<boost::mutex> lock(mut);
00574         while (0 != runningActors)
00575         {
00576             
00577             boost::system_time timeout = boost::get_system_time();
00578             timeout += boost::posix_time::milliseconds(50);
00579             (void) noRunningActorsCv.timed_wait(lock, timeout);
00580         }
00581     }
00582 
00583     void shutdown()
00584     {
00585         scheduler.run = false;
00586         while (!scheduler.exited) boost::this_thread::yield();
00587 #ifdef ACEDIA_WINDOWS
00588          WSACleanup();
00589 #endif
00590     }
00591 
00592     namespace details
00593     {
00594         ActorRef doSpawn(Actor *a)
00595         {
00596             
00597             
00598             a->ref();
00599             if (a->isDetached())
00600             {
00601                 
00602                 
00603                 Worker *w = new Worker(a);
00604                 boost::thread(WorkerRunner(w)).detach();
00605             }
00606             else
00607             {
00608                 if (0 == schedulerStartetFlag
00609                     && 1 == atomic::addAndFetch(&schedulerStartetFlag, 1))
00610                 {
00611 #ifdef ACEDIA_WINDOWS
00612                     WSADATA wsa;
00613                     if (0 != WSAStartup(MAKEWORD(2,0), &wsa))
00614                     {
00615                         ACEDIA_THROW(WSAStartupFailedException, "WSAStartup(MAKEWORD(2,0), &wsa) failed");
00616                     }
00617 #endif
00618                     boost::thread(boost::ref(scheduler)).detach();
00619                 }
00620                 ACEDIA_MEMORY_BARRIER();
00621                 scheduler.enqueue(a);
00622             }
00623             if (!(a->state() & HIDDEN)) incRunningActors();
00624             return a->self();
00625         }
00626     }
00627 
00628 } 
00629 
00630 
00631 
00632 
00633 
00634 
00635 static acedia::String m_application_name = "";
00636 static acedia::String m_application_id = "(Version 0.0)";
00637 
00638 static boost::uint32_t m_app_version_settings[4] = {0, 0, 0, 0};
00639 
00640 static bool m_allowNewerMinorVersion = false;
00641 static bool m_allowNewerMajorVersion = false;
00642 
00643 static std::map<acedia::String, acedia::ActorRef> m_namesMap;
00644 static boost::mutex m_namesMutex;
00645 
00646 namespace acedia
00647 {
00648     bool registerName(const String &name, const ActorRef &actor)
00649     {
00650         boost::mutex::scoped_lock lock(m_namesMutex);
00651         ActorRef &ar = m_namesMap[name];
00652         ar = actor;
00653         return true;
00654     }
00655 
00656     ActorRef whereisName(const String &name)
00657     {
00658         boost::mutex::scoped_lock lock(m_namesMutex);
00659         std::map<String, ActorRef>::const_iterator ci = m_namesMap.find(name);
00660         if (ci != m_namesMap.end())
00661         {
00662             return ci->second;
00663         }
00664         else if (name == "acedia_TimeEmitter")
00665         {
00666             ActorRef emitter = spawn<TimeEmitter>();
00667             m_namesMap.insert(std::map<String, ActorRef>::value_type(String("acedia_TimeEmitter"), emitter));
00668             return emitter;
00669         }
00670         else
00671         {
00672             return ActorRef();
00673         }
00674     }
00675 
00676     void setAppInfo(const char *appName,
00677                     boost::uint32_t majorVersion,
00678                     boost::uint32_t minorVersion,
00679                     boost::uint32_t minCompatibleMajorVersion,
00680                     boost::uint32_t minCompatibleMinorVersion,
00681                     bool allowNewerMinorVersion,
00682                     bool allowNewerMajorVersion)
00683     {
00684         m_application_name = appName;
00685         m_app_version_settings[0] = majorVersion;
00686         m_app_version_settings[1] = minorVersion;
00687         m_app_version_settings[2] = minCompatibleMajorVersion;
00688         m_app_version_settings[3] = minCompatibleMinorVersion;
00689         m_allowNewerMajorVersion = allowNewerMajorVersion;
00690         m_allowNewerMinorVersion = allowNewerMinorVersion;
00691         std::ostringstream os;
00692         os << appName << " (Version " << majorVersion << "."
00693            << minorVersion << ")";
00694         m_application_id = os.str();
00695     }
00696     namespace details
00697     {
00698         
00699         const String &appIdentifier()
00700         {
00701             return m_application_id;
00702         }
00703         const String &appName() { return m_application_name; }
00704         boost::uint32_t appMajorVersion()
00705         {
00706             return m_app_version_settings[0];
00707         }
00708         boost::uint32_t appMinorVersion()
00709         {
00710             return m_app_version_settings[1];
00711         }
00712         
00713         
00714         bool isCompatible(const String& appn,
00715                           boost::uint32_t major,
00716                           boost::uint32_t minor)
00717         {
00718             return appn == appName()
00719                    && major >= m_app_version_settings[2]
00720                    && minor >= m_app_version_settings[3]
00721                    && ((m_allowNewerMajorVersion && major > appMajorVersion()) || major <= appMajorVersion())
00722                    && ((m_allowNewerMinorVersion && minor > appMinorVersion()) || minor <= appMinorVersion());
00723         }
00724     }
00725     void link(ActorRef actor1, ActorRef actor2)
00726     {
00727         actor1.actor->linkTo(actor2);
00728     }
00729 }