#ifndef ReceiveCompressedBuffer_hh #define ReceiveCompressedBuffer_hh #include #if CHAR_BIT != 8 #error Only 8 bit char supported #endif #if defined(_MSC_VER) || defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__CYGWIN__) #define WIN32_LEAN_AND_MEAN #include #else #include #include #include #include #include #endif #include #include #include #include #include #include #include #include #include #include #include // Reads a sequence of unsigned integer byte values // in variable-length format and composes a 32 bit // integer. inline uint32_t decode(int sock) { uint32_t N = 0; uint32_t shift = 1; for (;;) { uint8_t abyte; PersistentRead(sock, &abyte, 1); N += (abyte & 127) * shift; if ((abyte & 128) == 0) { break; } shift <<= 7; N += shift; } return N; } template class ReceiveCompressedBuffer { bool fresh_; uint32_t cmpMsgLen_; uint32_t cmpRead_; uint32_t compressedBufsize_; uint32_t bufsize_; uint32_t recIndex_; uint32_t bytesRemainingInMsg_; uint32_t bytesAvailableInBuffer_; unsigned char* compressedBuf_; unsigned char* buf_; R reader_; public: #if defined(_MSC_VER) || defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__CYGWIN__) #ifdef EE_FILEIO HANDLE sock_; #else SOCKET sock_; #endif #else int sock_; #endif explicit ReceiveCompressedBuffer(uint32_t compressedBufsize) : fresh_(true), cmpMsgLen_(0), compressedBufsize_(compressedBufsize), bufsize_(5 * compressedBufsize), recIndex_(0), bytesRemainingInMsg_(0), bytesAvailableInBuffer_(0) { if (compressedBufsize_ > 0) { compressedBuf_ = new unsigned char[compressedBufsize_ + bufsize_]; buf_ = compressedBuf_ + compressedBufsize_; } } ~ReceiveCompressedBuffer() { delete [] compressedBuf_; } void Give(uint8_t& value) { value = Get(); } void Give(uint16_t& value) { reader_.Read(*this, value); } void Give(uint32_t& value) { reader_.Read(*this, value); } void Give(uint64_t& value) { reader_.Read(*this, value); } void Give(int8_t& value) { value = Get(); } void Give(int16_t& value) { uint16_t tmp; reader_.Read(*this, tmp); value = tmp; } void Give(int32_t& value) { uint32_t tmp; reader_.Read(*this, tmp); value = tmp; } void Give(int64_t& value) { uint64_t tmp; reader_.Read(*this, tmp); value = tmp; } void Give(float& value) { reader_.Read(*this, value); } void Give(double& value) { reader_.Read(*this, value); } template void Give(std::complex cmplx) { T value; Give(value); cmplx.real(value); Give(value); cmplx.imag(value); } template void GiveBlock(T* data, unsigned int elements) { reader_.ReadBlock(*this, data, elements); } void GetCompressed() { fresh_ = true; cmpRead_ += PersistentRead(sock_, compressedBuf_ + cmpRead_, cmpMsgLen_ - cmpRead_); if (cmpRead_ < cmpMsgLen_) { fresh_ = false; return; } bytesAvailableInBuffer_ = bufsize_; int bzrc = BZ2_bzBuffToBuffDecompress((char*)buf_, &bytesAvailableInBuffer_, (char*)compressedBuf_, cmpMsgLen_, 0, 0); if (bzrc != BZ_OK) { throw failure("ReceiveCompressedBuffer::GetCompressed -- bzBuffToBuffDecompress failed ") << bzrc; } recIndex_ = 0; } bool GotPacket() { if (fresh_) { cmpRead_ = 0; cmpMsgLen_ = decode(sock_); if (cmpMsgLen_ > compressedBufsize_) { throw failure("ReceiveCompressedBuffer::GotPacket -- incoming size too great"); } } GetCompressed(); return fresh_; } void Give(void* address, uint32_t len) { if (len > bytesRemainingInMsg_) { throw failure("ReceiveCompressedBuffer::Give -- len > bytesRemainingInMsg_"); } char* addr = reinterpret_cast (address); if (len > bytesAvailableInBuffer_) { } memcpy(addr, buf_ + recIndex_, len); recIndex_ += len; bytesAvailableInBuffer_ -= len; bytesRemainingInMsg_ -= len; } void GiveFile(lil_string const& name) { uint32_t const temp_buf_size = 20000; char input[temp_buf_size]; uint32_t sz; Give(sz); #if defined(_MSC_VER) || defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__CYGWIN__) HANDLE hcom = INVALID_HANDLE_VALUE; hcom = CreateFile(name.c_str(), GENERIC_WRITE, FILE_SHARE_READ, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL,0); if (hcom == INVALID_HANDLE_VALUE) { throw failure("ReceiveCompressedBuffer::GiveFile -- CreateFile failed "); } #undef min while (sz > 0) { uint32_t minBytes = std::min(sz, temp_buf_size); Give(input, minBytes); DWORD dwNumOfBytesWritten = 0; WriteFile(hcom, input, minBytes, &dwNumOfBytesWritten, NULL); sz -= minBytes; } CloseHandle(hcom); #else int fd = open(name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); if (fd < 0) { throw failure("ReceiveCompressedBuffer::GiveFile -- open failed."); } while (sz > 0) { uint32_t minBytes = std::min(sz, temp_buf_size); Give(input, minBytes); if (write(fd, input, minBytes) != (int32_t)minBytes) { close(fd); throw failure("ReceiveCompressedBuffer::GiveFile -- write failed."); } sz -= minBytes; } close(fd); #endif } unsigned char Get() { unsigned char byte; if (bytesAvailableInBuffer_ > 0) { byte = buf_[recIndex_]; ++recIndex_; --bytesAvailableInBuffer_; --bytesRemainingInMsg_; } else { Give(&byte, 1); } return byte; } void Reset() { recIndex_ = 0; bytesRemainingInMsg_ = 0; bytesAvailableInBuffer_ = 0; } void SetMsgLength(uint32_t newMsgLength) { bytesRemainingInMsg_ = newMsgLength; } void Debug() { std::cout << "bufsize_ is " << bufsize_ << std::endl; std::cout << "recIndex_ is " << recIndex_ << std::endl; } private: ReceiveCompressedBuffer(ReceiveCompressedBuffer const&); ReceiveCompressedBuffer& operator=(ReceiveCompressedBuffer const&); }; #endif