BinaryProtocol.cpp

См. документацию.
00001 #include "BinaryProtocol.h"
00002 
00003 /*BinaryProtocol::BinaryProtocol()
00004 {
00005 }
00006 
00007 BinaryProtocol::~BinaryProtocol()
00008 {
00009 }*/
00010 static pthread_mutex_t pktid_mutex = PTHREAD_MUTEX_INITIALIZER;
00011 static int pktID=0;
00012 
00013 BinaryProtocol::BinaryProtocol(BaseConfigFile* cf){
00014     //Ваяем мьютекс чтоб дождаться завершения работы потока, при отвязанном потоке
00015     pthread_mutex_init(&exit_mutex,NULL);
00016     pthread_mutex_init(&cs_mutex3,NULL);
00017     pthread_mutex_init(&cs_mutex4,NULL);
00018     pthread_attr_init(&attr);
00019     pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
00020     this->cf=cf; 
00021     this->max_pktsize=cf->getModuleAttributeInt(L"lib_file_opertion",L"max_packet_size",40960);
00022     cout<<"m pkt size"<<max_pktsize;
00023 }
00024 
00025 ssize_t BinaryProtocol::readToBuffer(void* buffer, size_t length) {
00026     //int sizeEnd=length;
00027     //while(sizeEnd>0){
00028     return recv(skt_id, buffer, length, MSG_WAITALL);
00029     //if (e<0){
00030     //sizeEnd-=e;
00031     //}else{
00032     //TODO Тут обработка исключения перехват его выше как-то и выход
00033     //                  exit(0);
00034     //}
00035     //  }
00036 }
00037 
00038 void BinaryProtocol::sendBuffer(const void* buffer, int length) {
00039     int err = 0;
00040     int l = length;
00041     char* buffero = (char*) buffer;
00042     while (l > 0) {
00043         err = send(skt_id, buffero, l, 0);
00044         if (err > 0) {
00045             #ifdef debug_output
00046             cout << "A OK";
00047             #endif
00048             buffero += err;
00049             l -= err;
00050         } else {
00051             cout << "A ERR";
00052             break;
00053         }
00054     }
00055 }
00056 
00057 void BinaryProtocol::sendPacket(BibaryPacket packet) {
00058     pthread_mutex_lock(&cs_mutex4);
00059     packet.header.headSize = sizeof (packet.header);
00060     sendBuffer(&packet.header, sizeof (packet.header));
00061     sendBuffer(packet.body, packet.header.bodySize); 
00062     pthread_mutex_unlock(&cs_mutex4);
00063 }
00064 
00065 BibaryPacket BinaryProtocol::readPacket() {
00066     BibaryPacket bp;
00067     if (readToBuffer(&bp.header, sizeof (bp.header)) < sizeof (bp.header)) {
00068         bp.header.headSize = 0;
00069         return bp;
00070     }
00071     if (sizeof (bp.header) < bp.header.headSize) {
00072         //Зарезервировано, на будующее если протокол будет расширятся типа
00073         size_t recervSize = sizeof (bp.header) - bp.header.headSize;
00074         void* buff = malloc(recervSize);
00075         if (readToBuffer(buff, recervSize) < recervSize) {
00076             bp.header.headSize = 0;
00077             free(buff);
00078             return bp;
00079         }
00080         free(buff);
00081 
00082     }
00083     bp.body = malloc(bp.header.bodySize);
00084     if (readToBuffer(bp.body, bp.header.bodySize) < bp.header.bodySize) {
00085         bp.header.headSize = 0;
00086         free(bp.body);
00087     }
00088     return bp;
00089 }
00090 
00091 void BinaryProtocol::setSendBufferData(const void* buffer, size_t size, unsigned long int pktID) {
00092     //size_t pktsize = 40960;
00093     size_t szForSend = size;
00094     char* buffer_cur = (char*) buffer;
00095     while (szForSend > 0) {
00096         BibaryPacket packet;
00097         packet.body = buffer_cur;
00098         packet.header.pktID = pktID;
00099         size_t ssend = max_pktsize;
00100         if (szForSend < max_pktsize) {
00101             ssend = szForSend;
00102         }
00103         packet.header.bodySize = ssend;
00104         sendPacket(packet);
00105 
00106         szForSend -= ssend;
00107         buffer_cur += ssend;
00108     }
00109 }
00110 
00111 void BinaryProtocol::setFillBuffer(void* buffer, size_t size, unsigned long int pktID) {
00112     BufferFiller* bf=new BufferFiller();
00113     bf->bufferSize = size;
00114     bf->forRead = size;
00115 
00116     bf->bufferC = buffer;
00117     bf->bufferS = buffer;
00118 
00119     bf->pktID = pktID;
00120     bf->allReaded=false;
00122     pthread_mutex_init(&(bf->mutex), NULL);
00124 
00125     pthread_cond_init(&(bf->cond_var),NULL);
00126 
00127 
00128     //Вставка и удаление не могут быть одновременными
00129 
00130     pthread_mutex_lock(&cs_mutex3);
00131     //pair < map<unsigned long int, BufferFiller>::iterator, bool> itBF =
00132     bufferFillers.insert(make_pair(pktID, bf));
00133     pthread_mutex_unlock(&cs_mutex3);
00134     
00135 }
00136 
00137 void BinaryProtocol::waiteFillBuffer(unsigned int pktID) {
00138     //TODO Ждем открытия семафора, и убиваем его
00139 
00140     pthread_mutex_lock(&cs_mutex3);
00141     map<unsigned long int, BufferFiller*>::iterator itBF = bufferFillers.find(pktID);
00142     pthread_mutex_unlock(&cs_mutex3);
00143 
00144     if (itBF != bufferFillers.end()) {
00145         //pthread_mutex_unlock(&cs_mutex3);
00146         //pthread_mutex_t& t=;
00147         //cout <<
00149         pthread_mutex_lock(&(itBF->second->mutex));
00150         while (!itBF->second->allReaded){
00151             pthread_cond_wait(&(itBF->second->cond_var),&(itBF->second->mutex));
00152         }
00153         pthread_mutex_unlock(&(itBF->second->mutex));
00155 
00156         pthread_mutex_lock(&cs_mutex3);
00157 
00158         pthread_mutex_destroy(&(itBF->second->mutex));
00159         pthread_cond_destroy(&(itBF->second->cond_var));
00160         delete (itBF->second);
00161         //free(itBF->second.mutex);
00162         bufferFillers.erase(itBF);
00163         
00164         pthread_mutex_unlock(&cs_mutex3);
00166         //pthread_mutex_destroy(&t);
00167     }else{
00168         cout<<"!!!!мютекса нету!!!!"<<endl;
00169     }
00170 }
00171 
00172 void BinaryProtocol::parse() {
00173     BibaryPacket bp;
00174     while (1) {
00175         #ifdef debug_output
00176         cout << "READING BIN PACKET 1" << endl;
00177         #endif
00178         bp = readPacket();
00179         if (bp.header.headSize == 0) {
00180             break;
00181             //return;
00182         }
00183         #ifdef debug_output
00184         cout << "READING BIN PACKET OK id:" << bp.header.pktID << " headsize:" << bp.header.headSize << " bodySize:" << bp.header.bodySize << endl;
00185         #endif
00186         pthread_mutex_lock(&cs_mutex3);
00187         map<unsigned long int, BufferFiller*>::iterator itBF = bufferFillers.find(bp.header.pktID);
00188         if (itBF != bufferFillers.end()) {
00189             int sz = bp.header.bodySize;
00190             if (itBF->second->forRead < bp.header.bodySize) {
00191                 sz = itBF->second->forRead;
00192             }
00193             memcpy(itBF->second->bufferC, bp.body, sz);
00194             itBF->second->forRead -= sz;
00195             itBF->second->bufferC = ((char*) itBF->second->bufferC) + sz;
00196             #ifdef debug_output
00197             cout << "For read size:" << itBF->second.forRead << endl;
00198             #endif
00199             if (itBF->second->forRead == 0) {
00200                 //TODO Открыть семафор
00201                 #ifdef debug_output
00202                 cout << "Opening Semaphore BIN P" << endl;
00203                 #endif
00204 
00205                 pthread_mutex_lock(&(itBF->second->mutex));
00206                 itBF->second->allReaded=true;
00207                 pthread_cond_broadcast(&(itBF->second->cond_var));
00208                 pthread_mutex_unlock(&(itBF->second->mutex));
00209                 
00212 
00213                 //Удаление не может быть произведено паралельно со вставкой и прочем
00214                 //bufferFillers.erase(itBF);
00215             }
00216         }else{
00217             cout<<"!!!!потеряли пакет!!!!"<<endl;
00218         }
00219         pthread_mutex_unlock(&cs_mutex3);
00220         //тут что то делаем с пакетом
00221         //bp=readPacket(); НЕ ПОНЯЛ!
00222         list<BPHandler*>::iterator bpb = handlers.begin();
00223         while (bpb != handlers.end()) {
00224             (*bpb)->handleBinPacket(bp);
00225             ++bpb;
00226         }
00227         free(bp.body);
00228     }
00229 }
00230 
00231 void* binary_threadFunc(void* clnt) {
00232     BinaryProtocol* bp = static_cast<BinaryProtocol*> (clnt);
00233     pthread_mutex_lock(&(bp->exit_mutex));
00234     bp->parse();
00235     #ifdef debug_output
00236     cout<<"Выход из потока"<<endl;
00237     #endif
00238     pthread_mutex_unlock(&(bp->exit_mutex));
00239     return NULL;
00240 }
00241 
00242 unsigned long int BinaryProtocol::getNextId(){
00243     pthread_mutex_lock(&pktid_mutex);
00244     unsigned long int res=++pktID;
00245     pthread_mutex_unlock(&pktid_mutex);
00246     return res;
00247 }
00248 
00249 void BinaryProtocol::startTread(int skt_id) {
00250     //cout<<"вход в поток"<<endl;
00251     this->skt_id = skt_id;
00252     pthread_create(&binThread, &attr, binary_threadFunc, this);
00253 }
00254 BinaryProtocol::~BinaryProtocol(){
00255     //Убиваем сокет чтобы recv вышел не знаю как его лучше завершить?
00256     //cout<<"Деструктор бин протокола"<<endl;
00257     shutdown(skt_id, SHUT_RDWR);
00258     close(skt_id);
00259     pthread_mutex_lock(&exit_mutex);
00260     pthread_mutex_unlock(&exit_mutex);
00261     pthread_mutex_destroy(&exit_mutex);
00262     //
00263     pthread_attr_destroy(&attr);
00264     pthread_mutex_destroy(&cs_mutex3);
00265     pthread_mutex_destroy(&cs_mutex4);
00266     
00267 }