00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #define ACEDIA_REDUCED_ANNOUNCE
00031
00032 #include "acedia_util.hpp"
00033
00034 #include "exceptions.hpp"
00035 #include <boost/thread.hpp>
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 namespace acedia
00046 {
00047 namespace details
00048 {
00049 static const ::ssize_t ui32Size = (::ssize_t) sizeof(boost::uint32_t);
00050 static const int INIT_BUF_SIZE = 512;
00051
00052 struct SocketIOStreamPrivate
00053 {
00054
00055 NativeSocketType m_socket;
00056 char *m_buffer;
00057 boost::uint32_t m_bufferSize;
00058
00059
00060 void readAll(SocketRecvPtr where, boost::uint32_t numBytes, boost::uint32_t offset = 0)
00061 {
00062
00063 SocketRecvPtr offsetWhere = (SocketRecvPtr) (((char*) where) + offset);
00064 ::ssize_t x = ::recv(m_socket, offsetWhere,
00065 numBytes, 0);
00066 if (x < 0)
00067 {
00068 ACEDIA_THROW(IOException, "Failed to read from socket");
00069 }
00070 else if (x == 0)
00071 {
00072 ACEDIA_THROW(IOException, "Cannot read from a closed socket");
00073 }
00074 else if (x < (::ssize_t) numBytes)
00075 {
00076 readAll(where, numBytes - x, offset + x);
00077 }
00078
00079 }
00080
00081 void write(SocketSendPtr buf, ::size_t len)
00082 {
00083 ::ssize_t x = send(m_socket, buf, len, 0);
00084 if (x != (::ssize_t) len)
00085 {
00086 if (x < 0)
00087 {
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109 ACEDIA_THROW(IOException, "Failed to write to socket");
00110 }
00111 else if (x == 0)
00112 ACEDIA_THROW(IOException, "send() returned 0 (closed?)");
00113 else
00114 ACEDIA_THROW(IOException, "send() wrote too less bytes");
00115 }
00116 }
00117
00118 SocketIOStreamPrivate(NativeSocketType socketDescriptor) :
00119 m_socket(socketDescriptor), m_bufferSize(INIT_BUF_SIZE)
00120 {
00121 m_buffer = new char[INIT_BUF_SIZE];
00122 }
00123
00124 ~SocketIOStreamPrivate()
00125 {
00126 delete m_buffer;
00127 }
00128
00129 void resizeBuffer(boost::uint32_t newMinSize)
00130 {
00131
00132 if (newMinSize >= (1024*1024))
00133 ACEDIA_THROW(IOException, "Too big message (>= 1MB)");
00134 else
00135 {
00136
00137 m_bufferSize = newMinSize + 100;
00138 delete m_buffer;
00139 m_buffer = new char[m_bufferSize];
00140 }
00141 }
00142
00143 void readToBuffer(boost::uint32_t numBytes)
00144 {
00145 if (numBytes + 1 >= m_bufferSize)
00146 {
00147 resizeBuffer(numBytes + 1);
00148 }
00149 readAll((SocketRecvPtr) m_buffer, numBytes);
00150 }
00151
00152 const char* buffer()
00153 {
00154 return m_buffer;
00155 }
00156
00157 private:
00158
00159 SocketIOStreamPrivate(const SocketIOStreamPrivate& other);
00160
00161 };
00162 }
00163
00164 namespace util
00165 {
00166 String eraseWhitespaces(const char *input)
00167 {
00168 String result;
00169 char c;
00170 for (unsigned int i = 0; input[i] != 0; ++i)
00171 {
00172 c = input[i];
00173 if (c != ' ' && c != '\t') result += c;
00174 }
00175 return result;
00176 }
00177
00178
00179
00180
00181
00182
00183 void Spinlock::lock() throw()
00184 {
00185 ACEDIA_MEMORY_BARRIER();
00186 for (;;)
00187 {
00188 if (atomic::compareAndSwap(&m_value, 0, 1))
00189 {
00190 ACEDIA_MEMORY_BARRIER();
00191 return;
00192 }
00193 }
00194 }
00195
00196 SpinlockGuard::SpinlockGuard(Spinlock &sl) : slock(sl)
00197 {
00198 slock.lock();
00199 }
00200
00201 SpinlockGuard::~SpinlockGuard()
00202 {
00203 slock.unlock();
00204 }
00205
00206 void RWSpinlock::sharedLock()
00207 {
00208 for (;;)
00209 {
00210 boost::int32_t v = m_value;
00211 if (v >= 0 && atomic::compareAndSwap(&m_value, v, v + 1))
00212 {
00213
00214
00215 ACEDIA_MEMORY_BARRIER();
00216 return;
00217 }
00218
00219
00220 }
00221 }
00222
00223 void RWSpinlock::exclusiveLock()
00224 {
00225 for (;;)
00226 {
00227 boost::int32_t v = m_value;
00228 if (v >= 0 && atomic::compareAndSwap(&m_value, v,
00229 (Int32Minimum+v)))
00230 {
00231 ACEDIA_MEMORY_BARRIER();
00232
00233 while (m_value != Int32Minimum)
00234 {
00235 boost::this_thread::yield();
00236 }
00237 ACEDIA_MEMORY_BARRIER();
00238 return;
00239 }
00240 else boost::this_thread::yield();
00241 }
00242 }
00243
00244 SharedLockGuard::SharedLockGuard(RWSpinlock &l) : rwlock(l)
00245 {
00246 l.sharedLock();
00247 }
00248
00249 SharedLockGuard::~SharedLockGuard() { rwlock.sharedUnlock(); }
00250
00251 ExclusiveLockGuard::ExclusiveLockGuard(RWSpinlock &l) : rwlock(l)
00252 {
00253 l.exclusiveLock();
00254 }
00255
00256 ExclusiveLockGuard::~ExclusiveLockGuard() { rwlock.exclusiveUnlock(); }
00257
00258
00259
00260
00261
00262
00263 void Semaphore::release(boost::int32_t n)
00264 {
00265 assert(n > 0);
00266 (void) atomic::addAndFetch(&m_value, n);
00267 ACEDIA_MEMORY_BARRIER();
00268 m_condition.notify_all();
00269 }
00270
00271 void Semaphore::acquire(boost::int32_t n)
00272 {
00273 assert(n > 0);
00274 boost::unique_lock<boost::mutex> lock(m_mutex);
00275 while (available() < n) m_condition.wait(lock);
00276 (void) atomic::addAndFetch(&m_value, -n);
00277 }
00278
00279 bool Semaphore::tryAcquire(boost::int32_t n)
00280 {
00281 assert(n > 0);
00282 boost::unique_lock<boost::mutex> lock(m_mutex);
00283 if (available() >= n)
00284 {
00285 (void) atomic::addAndFetch(&m_value, -n);
00286 return true;
00287 }
00288 return false;
00289 }
00290
00291 bool Semaphore::tryAcquire(boost::int32_t n, boost::uint16_t msTimeout)
00292 {
00293 assert(n > 0);
00294 boost::unique_lock<boost::mutex> lock(m_mutex);
00295 boost::system_time timeout = boost::get_system_time();
00296 timeout += boost::posix_time::milliseconds(msTimeout);
00297 while (available() < n)
00298 {
00299 if (!m_condition.timed_wait(lock, timeout))
00300 {
00301 return false;
00302 }
00303 }
00304 (void) atomic::addAndFetch(&m_value, -n);
00305 return true;
00306 }
00307
00308
00309
00310
00311
00312
00313 boost::int64_t littleEndian64BitFun(boost::int64_t value)
00314 {
00315 boost::uint8_t* from = (reinterpret_cast<boost::uint8_t*>(&value));
00316 boost::uint64_t tmp = 0;
00317 boost::uint8_t* to = (reinterpret_cast<boost::uint8_t*>(&tmp));
00318 register boost::int_fast32_t j = 7;
00319 for (register boost::int_fast32_t i = 0; i < 8; ++i)
00320 {
00321 to[i] = from[j--];
00322 }
00323 return tmp;
00324 }
00325
00326 boost::int64_t bigEndian64BitFun(boost::int64_t value)
00327 {
00328 return value;
00329 }
00330
00331 EndianConverter::EndianConverter()
00332 {
00333
00334 boost::uint16_t word = 0x0001;
00335 boost::uint8_t* bytes = reinterpret_cast<boost::uint8_t*>(&word);
00336 if (bytes[0])
00337 {
00338
00339
00340 m_int64Fun = littleEndian64BitFun;
00341 }
00342 else m_int64Fun = bigEndian64BitFun;
00343 }
00344
00345 AbstractEndianAwareStream::~AbstractEndianAwareStream() { }
00346
00347 EndianAwareOutputStream& operator<<(EndianAwareOutputStream& s,
00348 const std::string& str)
00349 {
00350 boost::uint32_t len = str.length();
00351 s << len;
00352 s.write(str.c_str(), len);
00353 return s;
00354 }
00355
00356 template<class StringClass>
00357 void readStringFromStream(EndianAwareInputStream& s, StringClass& str)
00358 {
00359 boost::uint32_t len;
00360 s >> len;
00361
00362 if (len < 128)
00363 {
00364 char stack_buf[128];
00365 s.read(stack_buf, len);
00366 stack_buf[len] = 0;
00367 str = stack_buf;
00368 }
00369 else
00370 {
00371 char *heap_buf = new char[len+1];
00372 s.read(heap_buf, len);
00373 heap_buf[len] = 0;
00374 str = heap_buf;
00375 delete heap_buf;
00376 }
00377 }
00378
00379 void operator>>(EndianAwareInputStream& s, acedia::String& str)
00380 {
00381 readStringFromStream(s, str);
00382 }
00383
00384 void operator>>(EndianAwareInputStream& s, std::string& str)
00385 {
00386 readStringFromStream(s, str);
00387 }
00388
00389 SocketIOStream::SocketIOStream(NativeSocketType socketDescriptor) :
00390 d(new details::SocketIOStreamPrivate(socketDescriptor))
00391 {
00392 }
00393
00394 SocketIOStream::SocketIOStream(const SocketIOStream& other) :
00395 EndianAwareStream(),
00396 d(new details::SocketIOStreamPrivate((other.d)->m_socket))
00397 {
00398 }
00399
00400 SocketIOStream::~SocketIOStream()
00401 {
00402 delete d;
00403 }
00404
00405 void SocketIOStream::close()
00406 {
00407 closesocket(d->m_socket);
00408 }
00409
00410 void SocketIOStream::readStringToBuffer()
00411 {
00412 boost::uint32_t strLen;
00413 readInt(strLen);
00414 if (strLen > 0)
00415 {
00416 d->readToBuffer(strLen);
00417 }
00418 }
00419
00420 const char* SocketIOStream::buffer() { return d->buffer(); }
00421
00422 void SocketIOStream::write(const char* what, ::size_t size)
00423 {
00424 d->write((SocketSendPtr) what, size);
00425 }
00426
00427 void SocketIOStream::read(char* what, ::size_t size)
00428 {
00429 d->readAll((SocketRecvPtr) what, size);
00430 }
00431
00432 void SocketIOStream::readToBuffer(::size_t size)
00433 {
00434 d->readAll((SocketRecvPtr) d->m_buffer, size);
00435 }
00436
00437 }
00438 }