#ifndef SendCompressedBuffer_hh #define SendCompressedBuffer_hh #include #if CHAR_BIT != 8 #error Only 8 bit char supported #endif #if defined(_MSC_VER) || defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__CYGWIN__) #define WIN32_LEAN_AND_MEAN #include #else #include #include #include #include #include #endif #include #include #include #include #include #include #include inline uint8_t CalculateIntMarshallingSize(uint32_t val) { if (val < 128) { // (2**7) return 1; } else { if (val < 16384) { // (2**14) return 2; } else { if (val < 2097152) { return 3; } else { if (val < 268435456) { return 4; } else { return 5; } } } } } // Encodes integer into variable-length format. inline void encode(uint32_t N, unsigned char* addr) { while (true) { uint8_t abyte = N & 127; N >>= 7; if (0 == N) { *addr = abyte; break; } abyte |= 128; *addr = abyte; ++addr; N -= 1; } } class SendCompressedBuffer { uint32_t compressedBufsize_; uint32_t bufsize_; uint32_t index_; unsigned char* compressedBuf_; unsigned char* buf_; uint8_t maxBytes_; public: #if defined(_MSC_VER) || defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__CYGWIN__) #ifdef EE_FILEIO HANDLE sock_; #else SOCKET sock_; #endif #else int sock_; #endif explicit SendCompressedBuffer(uint32_t compressedBufsize) : compressedBufsize_(compressedBufsize), bufsize_(1.5 * compressedBufsize), index_(0) { if (compressedBufsize_ > 0) { compressedBuf_ = new unsigned char[compressedBufsize_ + bufsize_]; buf_ = compressedBuf_ + compressedBufsize_; } maxBytes_ = CalculateIntMarshallingSize(compressedBufsize_); } void Resize(uint32_t newsize) { compressedBufsize_ = newsize; bufsize_ = 1.5 * compressedBufsize_; index_ = 0; delete [] compressedBuf_; compressedBuf_ = new unsigned char[compressedBufsize_ + bufsize_]; buf_ = compressedBuf_ + compressedBufsize_; maxBytes_ = CalculateIntMarshallingSize(compressedBufsize_); } ~SendCompressedBuffer() { delete [] compressedBuf_; } void Flush() { uint32_t writabledstlen = compressedBufsize_ - maxBytes_; int bzrc = BZ2_bzBuffToBuffCompress(reinterpret_cast (compressedBuf_ + maxBytes_), &writabledstlen, reinterpret_cast (buf_), index_, 7, 0, 0); if (BZ_OK != bzrc) { throw failure("Buffer::Flush -- bzBuffToBuffCompress failed ") << bzrc; } uint8_t actualBytes = CalculateIntMarshallingSize(writabledstlen); encode(writabledstlen, compressedBuf_ + (maxBytes_ - actualBytes)); PersistentWrite(sock_, compressedBuf_ + (maxBytes_ - actualBytes), actualBytes + writabledstlen); index_ = 0; } void Receive(void const* data, uint32_t dlen) { unsigned char const* d2 = reinterpret_cast(data); while (dlen > bufsize_ - index_) { memcpy(buf_ + index_, d2, bufsize_ - index_); d2 += bufsize_ - index_; dlen -= bufsize_ - index_; index_ = bufsize_; Flush(); } memcpy(buf_ + index_, d2, dlen); index_ += dlen; } void Receive32(uint32_t val) { Receive(&val, sizeof(val)); } void ReceiveFile(lil_string const& name) { #if defined(_MSC_VER) || defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__CYGWIN__) HANDLE hcom = INVALID_HANDLE_VALUE; hcom = CreateFile(name.c_str(), GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL,0); if (hcom == INVALID_HANDLE_VALUE) { throw failure("SendCompressedBuffer::ReceiveFile -- CreateFile failed"); } LARGE_INTEGER li; bool bRet = GetFileSizeEx(hcom, &li); uint32_t sz = static_cast (li.LowPart); Receive(&sz, sizeof(sz)); DWORD dwNumOfBytesRead = 0; while (sz > bufsize_ - index_) { ReadFile(hcom, buf_ + index_, bufsize_ - index_, &dwNumOfBytesRead, NULL); sz -= bufsize_ - index_; index_ = bufsize_; Flush(); } ReadFile(hcom, buf_ + index_, sz, &dwNumOfBytesRead, NULL); index_ += sz; CloseHandle(hcom); #else int fd = open(name.c_str(), O_RDONLY); if (fd < 0) { throw failure("SendCompressedBuffer::ReceiveFile -- open failed."); } struct stat sb; if (fstat(fd, &sb) == -1) { close(fd); throw failure("SendCompressedBuffer::ReceiveFile -- fstat failed."); } uint32_t sz = static_cast (sb.st_size); Receive(&sz, sizeof(sz)); //printf("File size: %lld bytes\n", // (long long) sb.st_size); while (sz > bufsize_ - index_) { read(fd, buf_ + index_, bufsize_ - index_); sz -= bufsize_ - index_; index_ = bufsize_; Flush(); } read(fd, buf_ + index_, sz); index_ += sz; close(fd); #endif } private: SendCompressedBuffer(SendCompressedBuffer const&); SendCompressedBuffer& operator=(SendCompressedBuffer const&); }; #endif