См. документацию.00001 #include "BinaryProtocol.h"
00002
00003
00004
00005
00006
00007
00008
00009
00010 static pthread_mutex_t pktid_mutex = PTHREAD_MUTEX_INITIALIZER;
00011 static int pktID=0;
00012
00013 BinaryProtocol::BinaryProtocol(BaseConfigFile* cf){
00014
00015 pthread_mutex_init(&exit_mutex,NULL);
00016 pthread_mutex_init(&cs_mutex3,NULL);
00017 pthread_mutex_init(&cs_mutex4,NULL);
00018 pthread_attr_init(&attr);
00019 pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
00020 this->cf=cf;
00021 this->max_pktsize=cf->getModuleAttributeInt(L"lib_file_opertion",L"max_packet_size",40960);
00022 cout<<"m pkt size"<<max_pktsize;
00023 }
00024
00025 ssize_t BinaryProtocol::readToBuffer(void* buffer, size_t length) {
00026
00027
00028 return recv(skt_id, buffer, length, MSG_WAITALL);
00029
00030
00031
00032
00033
00034
00035
00036 }
00037
00038 void BinaryProtocol::sendBuffer(const void* buffer, int length) {
00039 int err = 0;
00040 int l = length;
00041 char* buffero = (char*) buffer;
00042 while (l > 0) {
00043 err = send(skt_id, buffero, l, 0);
00044 if (err > 0) {
00045 #ifdef debug_output
00046 cout << "A OK";
00047 #endif
00048 buffero += err;
00049 l -= err;
00050 } else {
00051 cout << "A ERR";
00052 break;
00053 }
00054 }
00055 }
00056
00057 void BinaryProtocol::sendPacket(BibaryPacket packet) {
00058 pthread_mutex_lock(&cs_mutex4);
00059 packet.header.headSize = sizeof (packet.header);
00060 sendBuffer(&packet.header, sizeof (packet.header));
00061 sendBuffer(packet.body, packet.header.bodySize);
00062 pthread_mutex_unlock(&cs_mutex4);
00063 }
00064
00065 BibaryPacket BinaryProtocol::readPacket() {
00066 BibaryPacket bp;
00067 if (readToBuffer(&bp.header, sizeof (bp.header)) < sizeof (bp.header)) {
00068 bp.header.headSize = 0;
00069 return bp;
00070 }
00071 if (sizeof (bp.header) < bp.header.headSize) {
00072
00073 size_t recervSize = sizeof (bp.header) - bp.header.headSize;
00074 void* buff = malloc(recervSize);
00075 if (readToBuffer(buff, recervSize) < recervSize) {
00076 bp.header.headSize = 0;
00077 free(buff);
00078 return bp;
00079 }
00080 free(buff);
00081
00082 }
00083 bp.body = malloc(bp.header.bodySize);
00084 if (readToBuffer(bp.body, bp.header.bodySize) < bp.header.bodySize) {
00085 bp.header.headSize = 0;
00086 free(bp.body);
00087 }
00088 return bp;
00089 }
00090
00091 void BinaryProtocol::setSendBufferData(const void* buffer, size_t size, unsigned long int pktID) {
00092
00093 size_t szForSend = size;
00094 char* buffer_cur = (char*) buffer;
00095 while (szForSend > 0) {
00096 BibaryPacket packet;
00097 packet.body = buffer_cur;
00098 packet.header.pktID = pktID;
00099 size_t ssend = max_pktsize;
00100 if (szForSend < max_pktsize) {
00101 ssend = szForSend;
00102 }
00103 packet.header.bodySize = ssend;
00104 sendPacket(packet);
00105
00106 szForSend -= ssend;
00107 buffer_cur += ssend;
00108 }
00109 }
00110
00111 void BinaryProtocol::setFillBuffer(void* buffer, size_t size, unsigned long int pktID) {
00112 BufferFiller* bf=new BufferFiller();
00113 bf->bufferSize = size;
00114 bf->forRead = size;
00115
00116 bf->bufferC = buffer;
00117 bf->bufferS = buffer;
00118
00119 bf->pktID = pktID;
00120 bf->allReaded=false;
00122 pthread_mutex_init(&(bf->mutex), NULL);
00124
00125 pthread_cond_init(&(bf->cond_var),NULL);
00126
00127
00128
00129
00130 pthread_mutex_lock(&cs_mutex3);
00131
00132 bufferFillers.insert(make_pair(pktID, bf));
00133 pthread_mutex_unlock(&cs_mutex3);
00134
00135 }
00136
00137 void BinaryProtocol::waiteFillBuffer(unsigned int pktID) {
00138
00139
00140 pthread_mutex_lock(&cs_mutex3);
00141 map<unsigned long int, BufferFiller*>::iterator itBF = bufferFillers.find(pktID);
00142 pthread_mutex_unlock(&cs_mutex3);
00143
00144 if (itBF != bufferFillers.end()) {
00145
00146
00147
00149 pthread_mutex_lock(&(itBF->second->mutex));
00150 while (!itBF->second->allReaded){
00151 pthread_cond_wait(&(itBF->second->cond_var),&(itBF->second->mutex));
00152 }
00153 pthread_mutex_unlock(&(itBF->second->mutex));
00155
00156 pthread_mutex_lock(&cs_mutex3);
00157
00158 pthread_mutex_destroy(&(itBF->second->mutex));
00159 pthread_cond_destroy(&(itBF->second->cond_var));
00160 delete (itBF->second);
00161
00162 bufferFillers.erase(itBF);
00163
00164 pthread_mutex_unlock(&cs_mutex3);
00166
00167 }else{
00168 cout<<"!!!!мютекса нету!!!!"<<endl;
00169 }
00170 }
00171
00172 void BinaryProtocol::parse() {
00173 BibaryPacket bp;
00174 while (1) {
00175 #ifdef debug_output
00176 cout << "READING BIN PACKET 1" << endl;
00177 #endif
00178 bp = readPacket();
00179 if (bp.header.headSize == 0) {
00180 break;
00181
00182 }
00183 #ifdef debug_output
00184 cout << "READING BIN PACKET OK id:" << bp.header.pktID << " headsize:" << bp.header.headSize << " bodySize:" << bp.header.bodySize << endl;
00185 #endif
00186 pthread_mutex_lock(&cs_mutex3);
00187 map<unsigned long int, BufferFiller*>::iterator itBF = bufferFillers.find(bp.header.pktID);
00188 if (itBF != bufferFillers.end()) {
00189 int sz = bp.header.bodySize;
00190 if (itBF->second->forRead < bp.header.bodySize) {
00191 sz = itBF->second->forRead;
00192 }
00193 memcpy(itBF->second->bufferC, bp.body, sz);
00194 itBF->second->forRead -= sz;
00195 itBF->second->bufferC = ((char*) itBF->second->bufferC) + sz;
00196 #ifdef debug_output
00197 cout << "For read size:" << itBF->second.forRead << endl;
00198 #endif
00199 if (itBF->second->forRead == 0) {
00200
00201 #ifdef debug_output
00202 cout << "Opening Semaphore BIN P" << endl;
00203 #endif
00204
00205 pthread_mutex_lock(&(itBF->second->mutex));
00206 itBF->second->allReaded=true;
00207 pthread_cond_broadcast(&(itBF->second->cond_var));
00208 pthread_mutex_unlock(&(itBF->second->mutex));
00209
00212
00213
00214
00215 }
00216 }else{
00217 cout<<"!!!!потеряли пакет!!!!"<<endl;
00218 }
00219 pthread_mutex_unlock(&cs_mutex3);
00220
00221
00222 list<BPHandler*>::iterator bpb = handlers.begin();
00223 while (bpb != handlers.end()) {
00224 (*bpb)->handleBinPacket(bp);
00225 ++bpb;
00226 }
00227 free(bp.body);
00228 }
00229 }
00230
00231 void* binary_threadFunc(void* clnt) {
00232 BinaryProtocol* bp = static_cast<BinaryProtocol*> (clnt);
00233 pthread_mutex_lock(&(bp->exit_mutex));
00234 bp->parse();
00235 #ifdef debug_output
00236 cout<<"Выход из потока"<<endl;
00237 #endif
00238 pthread_mutex_unlock(&(bp->exit_mutex));
00239 return NULL;
00240 }
00241
00242 unsigned long int BinaryProtocol::getNextId(){
00243 pthread_mutex_lock(&pktid_mutex);
00244 unsigned long int res=++pktID;
00245 pthread_mutex_unlock(&pktid_mutex);
00246 return res;
00247 }
00248
00249 void BinaryProtocol::startTread(int skt_id) {
00250
00251 this->skt_id = skt_id;
00252 pthread_create(&binThread, &attr, binary_threadFunc, this);
00253 }
00254 BinaryProtocol::~BinaryProtocol(){
00255
00256
00257 shutdown(skt_id, SHUT_RDWR);
00258 close(skt_id);
00259 pthread_mutex_lock(&exit_mutex);
00260 pthread_mutex_unlock(&exit_mutex);
00261 pthread_mutex_destroy(&exit_mutex);
00262
00263 pthread_attr_destroy(&attr);
00264 pthread_mutex_destroy(&cs_mutex3);
00265 pthread_mutex_destroy(&cs_mutex4);
00266
00267 }