00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include "acedia_util.hpp"
00033 #include "acedia_network.hpp"
00034
00035 #include <stdio.h>
00036 #include <stdlib.h>
00037 #include <cstring>
00038
00039 #include <map>
00040
00041 #include "actor.hpp"
00042 #include "actorproxy.hpp"
00043 #include <boost/thread.hpp>
00044
00045 ACEDIA_DECLARE_CASE_CLASS((acedia)(Ping))
00046
00047 #include <iostream>
00048 using std::cout;
00049 using std::cerr;
00050 using std::endl;
00051
00052 ACEDIA_DECLARE_CASE_TUPLE2((acedia)(details)(ProxyActorIds), originalActorId, boost::uint32_t, localProxyId, boost::uint32_t)
00053 ACEDIA_DECLARE_CASE_TUPLE2((acedia)(details)(AddLinkToMap), localActor, boost::uint32_t, proxyActor, acedia::details::ProxyActorIds)
00054 ACEDIA_DECLARE_CASE_TUPLE2((acedia)(details)(RemoveLinkFromMap), localActor, boost::uint32_t, proxyActor, acedia::details::ProxyActorIds)
00055
00056 namespace acedia
00057 {
00058 PublishingResult::~PublishingResult() { }
00059
00060 const char *asString(PublishingError pe)
00061 {
00062 switch (pe)
00063 {
00064 case NO_PUBLISHING_ERROR: return "NO_PUBLISHING_ERROR";
00065 case SERVER_SOCKET_ERROR:
00066 return "SERVER_SOCKET_ERROR (socket can't be created/opened)";
00067 case BINDING_ERROR:
00068 return "BINDING_ERROR (port already in use)";
00069 default: return "-?-";
00070 }
00071 }
00072
00073 const char *PublishingResult::errorAsString() const throw()
00074 {
00075 return asString(error());
00076 }
00077
00078 const char *asString(ConnectionError ce)
00079 {
00080 switch (ce)
00081 {
00082 case CONNECTION_SOCKET_ERROR:
00083 return "CONNECTION_SOCKET_ERROR (socket can't be created/opened)";
00084
00085 case NO_SUCH_HOST_ERROR:
00086 return "NO_SUCH_HOST_ERROR (host name not found)";
00087
00088 case COULD_NOT_CONNECT_TO_HOST_ERROR:
00089 return "COULD_NOT_CONNECT_TO_HOST_ERROR";
00090
00091 case INCOMPATIBLE_ACEDIA_VERSIONS_ERROR:
00092 return "INCOMPATIBLE_ACEDIA_VERSIONS_ERROR";
00093
00094 case INCOMPATIBLE_HOST_ERROR:
00095 return "INCOMPATIBLE_HOST_ERROR";
00096
00097 default: return "-?-";
00098 }
00099 }
00100
00101 namespace details
00102 {
00103
00104 typedef std::map< ActorRef, std::list<ActorRef> > LinkMap;
00105
00106
00107 void fakeExitFromAllDisconnectedLinks(LinkMap& links)
00108 {
00109 cerr << "fakeExitFromAllDisconnectedLinks" << endl;
00110
00111 for (LinkMap::iterator oi = links.begin(); oi != links.end(); ++oi)
00112 {
00113 ActorRef localActor = oi->first;
00114 if (localActor.isFarRef()) cerr << "LOCAL ACTOR IS A FAR REF!!" << endl;
00115 std::list<ActorRef>& remoteActors = oi->second;
00116 std::list<ActorRef>::iterator ii;
00117 for (ii = remoteActors.begin(); ii != remoteActors.end(); ++ii)
00118 {
00119 ActorRef remoteActor = *ii;
00120 Message msg(remoteActor, localActor,
00121 Exit(exit_reasons::REMOTE_LINK_UNREACHABLE));
00122 if (!remoteActor.isFarRef()) cerr << "REMOTE ACTOR IS NOT A FAR REF!!!" << endl;
00123 cerr << "Sended a message from remote actor "
00124 << remoteActor.realActorId()
00125 << " to the local actor "
00126 << localActor.actorId() << endl;
00127 localActor.send(msg);
00128 }
00129 }
00130 }
00131
00132
00133
00134 class Mailman : public acedia::Actor
00135 {
00136
00137 LinkMap links;
00138 NativeSocketType m_socket;
00139
00140 void addLinkToMap(const ActorRef& localActor, const ActorRef& remoteActor)
00141 {
00142 links[localActor].push_back(remoteActor);
00143 }
00144
00145 void removeLinkFromMap(const ActorRef& localActor, const ActorRef& remoteActor)
00146 {
00147 links[localActor].remove(remoteActor);
00148 }
00149
00150 protected:
00151
00152 virtual bool filterIncomingMessage(const Message& msg)
00153 {
00154 return filterTimeoutEvents(msg);
00155 }
00156
00157 public:
00158
00159 virtual void linkTo(const ActorRef& who)
00160 {
00161
00162 (void) (who.actor->backlinkTo(self()));
00163 }
00164
00165 virtual bool backlinkTo(ActorRef&)
00166 {
00167
00168
00169 return true;
00170 }
00171
00172
00173 Mailman(NativeSocketType socketDescr) :
00174 Actor(HIDDEN, true), m_socket(socketDescr)
00175 {
00176 }
00177
00178 virtual void act()
00179 {
00180 util::SocketIOStream sio(m_socket);
00181 Message msg;
00182 String str;
00183 Message pingMsg = Message(ActorRef(), ActorRef(), Ping());
00184 String serializedPing = pingMsg.serializeToString();
00185 try
00186 {
00187 for (;;)
00188 {
00189
00190 msg = receive();
00191
00192 {
00193
00194 if (msg.match<AddRemoteLink>())
00195 {
00196 addLinkToMap(msg.sender(), msg.receiver());
00197 }
00198 else if (msg.match<Exit>())
00199 {
00200 removeLinkFromMap(msg.sender(), msg.receiver());
00201 }
00202 else if (msg.match<AddLinkToMap>())
00203 {
00204 const AddLinkToMap& cmd = msg.uncheckedValueAt<AddLinkToMap>(0);
00205 const ProxyActorIds& proxyIds = cmd.proxyActor();
00206 ActorRef a1 = getInstanceById(cmd.localActor());
00207 ActorRef a2 = getInstanceById(proxyIds.localProxyId());
00208 addLinkToMap(a1, a2);
00209 continue;
00210 }
00211 else if (msg.match<RemoveLinkFromMap>())
00212 {
00213 const RemoveLinkFromMap& cmd = msg.uncheckedValueAt<RemoveLinkFromMap>(0);
00214 const ProxyActorIds& proxyIds = cmd.proxyActor();
00215 ActorRef a1 = getInstanceById(cmd.localActor());
00216 ActorRef a2 = getInstanceById(proxyIds.localProxyId());
00217 removeLinkFromMap(a1, a2);
00218 continue;
00219 }
00220
00221
00222 str = msg.serializeToString();
00223
00224 sio << (str);
00225 }
00226
00227
00228
00229
00230
00231 }
00232 }
00233
00234
00235
00236
00237 catch (...)
00238 {
00239
00240 }
00241
00242 sio.close();
00243 fakeExitFromAllDisconnectedLinks(links);
00244 }
00245
00246 };
00247
00248
00249
00250 class PostOffice : public acedia::Actor, public details::ProxyFactory
00251 {
00252 typedef std::map<boost::uint32_t, ActorRef> ProxyMap;
00253
00254 NativeSocketType m_socket;
00255 ProxyMap m_proxys;
00256 ActorRef m_mailman;
00257
00258 public:
00259
00260 PostOffice(NativeSocketType socketDescr) :
00261 Actor(HIDDEN, true),
00262 m_socket(socketDescr)
00263 {
00264 m_mailman = spawn<Mailman>(socketDescr);
00265 }
00266
00267 void act()
00268 {
00269 util::SocketIOStream sio(m_socket);
00270 try
00271 {
00272 for (;;)
00273 {
00274
00275 sio.readStringToBuffer();
00276
00277 Message msg = Message::deserializeFrom(sio.buffer(),
00278 this);
00279
00280 if (msg.match<AddRemoteLink>())
00281 {
00282 const ActorRef& a1 = msg.receiver();
00283 const ActorRef& a2 = msg.sender();
00284 send(m_mailman, AddLinkToMap(a1.actorId(), ProxyActorIds(a2.realActorId(), a2.actorId())));
00285 }
00286 else if (msg.match<Exit>())
00287 {
00288 const ActorRef& a1 = msg.receiver();
00289 const ActorRef& a2 = msg.sender();
00290 send(m_mailman, RemoveLinkFromMap(a1.actorId(), ProxyActorIds(a2.realActorId(), a2.actorId())));
00291 }
00292
00293 else if (msg.match<Ping>()) continue;
00294
00295
00296 msg.receiver().send(msg);
00297
00298
00299 yield(READY);
00300 }
00301 }
00302
00303
00304
00305
00306
00307
00308
00309
00310 catch (...)
00311 {
00312
00313 }
00314
00315 sio.close();
00316 }
00317
00318 ActorRef get(boost::uint32_t originalActorId)
00319 {
00320 if (originalActorId == 0) return ActorRef();
00321 else
00322 {
00323 ProxyMap::iterator i = m_proxys.find(originalActorId);
00324 if (i == m_proxys.end())
00325 {
00326
00327 ActorRef proxy(new ActorProxy(originalActorId, m_mailman));
00328 m_proxys.insert(ProxyMap::value_type(originalActorId, proxy));
00329 return proxy;
00330 }
00331 else
00332 {
00333
00334 return i->second;
00335 }
00336 }
00337 }
00338
00339 };
00340
00341 class MiddleMan : public acedia::Actor
00342 {
00343
00344 typedef std::map<int , ActorRef > ChildrenMap;
00345
00346 NativeSocketType m_serverSocket;
00347 ChildrenMap m_children;
00348
00349 ActorRef m_publishedActor;
00350
00351 public:
00352
00353 MiddleMan(NativeSocketType serverSocket, const ActorRef& publishedActor) : Actor(HIDDEN), m_serverSocket(serverSocket), m_publishedActor(publishedActor)
00354 {
00355 }
00356
00357 void act()
00358 {
00359 sockaddr addr;
00360 socklen_t addrlen;
00361 for (;;)
00362 {
00363
00364 int newsockfd = accept(m_serverSocket, &addr, &addrlen);
00365 if (newsockfd < 0)
00366 ACEDIA_THROW(IOException, "Invalid socket accepted");
00367
00368 util::SocketIOStream sio(newsockfd);
00369 boost::uint32_t major, minor;
00370 std::string app_name;
00371 sio >> major;
00372 sio >> minor;
00373 sio >> app_name;
00374 if (acedia::details::isCompatible(app_name, major, minor))
00375 {
00376 cout << "Compatible client connected.\n";
00377
00378 sio << (char) 1;
00379
00380 sio << (boost::uint32_t) m_publishedActor.actorId();
00381
00382 ActorRef postOff = spawn<PostOffice>(newsockfd);
00383 m_children.insert(ChildrenMap::value_type(newsockfd,
00384 postOff));
00385 }
00386 else
00387 {
00388 cout << "Client rejected.\n";
00389
00390 sio << (char) 0;
00391 sio.close();
00392
00393
00394 }
00395 }
00396 }
00397
00398 };
00399 }
00400
00401 PublishingResult publish(ActorRef who, boost::uint16_t port)
00402 {
00403 NativeSocketType sockfd;
00404
00405 struct sockaddr_in serv_addr;
00406 sockfd = socket(AF_INET, SOCK_STREAM, 0);
00407 if (sockfd < 0)
00408 {
00409 return SERVER_SOCKET_ERROR;
00410 }
00411 memset((char*) &serv_addr, 0, sizeof(serv_addr));
00412 serv_addr.sin_family = AF_INET;
00413 serv_addr.sin_addr.s_addr = INADDR_ANY;
00414 serv_addr.sin_port = htons(port);
00415 if (bind(sockfd, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
00416 {
00417 return BINDING_ERROR;
00418 }
00419 listen(sockfd, 5);
00420
00421 (void) spawn<details::MiddleMan>(sockfd, who);
00422 return NO_PUBLISHING_ERROR;
00423 }
00424
00425 inline std::pair<ActorRef, ConnectionError> getResult(ConnectionError err)
00426 {
00427 return std::make_pair(ActorRef(), err);
00428 }
00429
00430 inline std::pair<ActorRef, ConnectionError> getResult(ActorRef actorRef)
00431 {
00432 return std::make_pair(actorRef, NO_CONNECTION_ERROR);
00433 }
00434
00435 std::pair<ActorRef, ConnectionError> remoteActor(const char *host, boost::uint16_t port)
00436 {
00437 NativeSocketType sockfd;
00438 struct sockaddr_in serv_addr;
00439 struct hostent *server;
00440 sockfd = socket(AF_INET, SOCK_STREAM, 0);
00441 if (sockfd < 0)
00442 return getResult(CONNECTION_SOCKET_ERROR);
00443 server = gethostbyname(host);
00444 if (server == NULL)
00445 return getResult(NO_SUCH_HOST_ERROR);
00446 memset((char *) &serv_addr, 0, sizeof(serv_addr));
00447 serv_addr.sin_family = AF_INET;
00448 (void) memmove((char *)&serv_addr.sin_addr.s_addr,
00449 (char *)server->h_addr,
00450 server->h_length);
00451 serv_addr.sin_port = htons(port);
00452 if (connect(sockfd, (const sockaddr*) &serv_addr,sizeof(serv_addr)) < 0)
00453 return getResult(COULD_NOT_CONNECT_TO_HOST_ERROR);
00454
00455 util::SocketIOStream sio(sockfd);
00456
00457 sio << details::appMajorVersion();
00458
00459 sio << details::appMinorVersion();
00460
00461 sio << details::appName();
00462 boost::uint8_t login_ok;
00463 sio >> login_ok;
00464
00465 if (login_ok)
00466 {
00467 boost::uint32_t remote_actor_id;
00468 sio >> remote_actor_id;
00469 details::PostOffice *po = new details::PostOffice(sockfd);
00470 ActorRef result = po->get(remote_actor_id);
00471 details::doSpawn(po);
00472 return getResult(result);
00473 }
00474 else return getResult(INCOMPATIBLE_HOST_ERROR);
00475 }
00476
00477 }