00001
00002
00003
00004
00005
00006
00007
00008 #include "Protocol.h"
00009 #include <errno.h>
00010
00011
00012
00013
00014
00015 void* node_threadFunc(void* clnt) {
00016 pair<Protocol*, Node*>* pv = static_cast<pair<Protocol*, Node*>*> (clnt);
00017 Protocol* pt = pv->first;
00018 Node* cn = pv->second;
00019 delete pv;
00020 pt->handlerNode(cn);
00021
00022
00023 pthread_mutex_lock(&(pt->mutDelEndNodRu));
00024 --(pt->num_run_threads);
00025 if((pt->num_run_threads)==0){pthread_cond_signal(&(pt->cond_del_element));}
00026 pthread_mutex_unlock(&(pt->mutDelEndNodRu));
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 return NULL;
00038 }
00039
00040 void
00041 characterData(void *userData, const XML_Char *s, int len) {
00042 Protocol* protocol = (Protocol*) userData;
00043 protocol->curNode->addXMLBody(string(s, len));
00044 #ifdef debug_output
00045 static pthread_mutex_t deta_m=PTHREAD_MUTEX_INITIALIZER;
00046 pthread_mutex_lock(&deta_m);
00047 cout<<"deta!:"<<string(s,len)<<endl;
00048 pthread_mutex_unlock(&deta_m);
00049 #endif
00050 }
00051
00052 void
00053 startTag(void *userData, const XML_Char *name, const XML_Char **atts) {
00054 #ifdef debug_output
00055 cout << "start_tag" << endl;
00056 #endif
00057 Protocol* protocol = (Protocol*) userData;
00058 protocol->curDeph++;
00059 map<string, string> attrsMap;
00060 const XML_Char **curAttr = atts;
00061 while (*curAttr != NULL) {
00062 #ifdef debug_output
00063 cout << "name:" << *curAttr << " value: " << *(curAttr + 1) << endl;
00064 #endif
00065 attrsMap[*curAttr] = *(curAttr + 1);
00066 curAttr = curAttr + 2;
00067 }
00068 pthread_mutex_lock(&(protocol->cs_mutexNewNode));
00069 Node* curNode = new Node(name, attrsMap, protocol->curNode);
00070 pthread_mutex_unlock(&(protocol->cs_mutexNewNode));
00071 if(protocol->curNode==NULL){
00072 protocol->topNode=curNode;
00073 }
00074 protocol->curNode = curNode;
00075 if (protocol->curDeph == 1) {
00076 protocol->startXMLTagNodeHandler(curNode);
00077 }
00078 }
00079
00080 void
00081 endTag(void *userData, const XML_Char *name) {
00082 #ifdef debug_output
00083 cout << "end_tag" << endl;
00084 #endif
00085 Protocol* protocol = (Protocol*) userData;
00086 protocol->curDeph--;
00087
00088 Node* curnP = protocol->curNode->getParent();
00089 if (protocol->curDeph == 1) {
00090
00091
00092
00093 pair<Protocol*, Node*>* pv = new pair<Protocol*, Node*>(protocol, protocol->curNode);
00094
00095 pthread_mutex_lock(&(protocol->mutDelEndNodRu));
00096 ++(protocol->num_run_threads);
00097 pthread_mutex_unlock(&(protocol->mutDelEndNodRu));
00099
00100
00101
00102
00104 pthread_create(&(protocol->nodeThread), &(protocol->attr), node_threadFunc, pv);
00105
00106
00107
00108 } else {
00109 if (protocol->curDeph == 0) {
00110 protocol->endXMLTagNodeHandler(protocol->curNode);
00111 }
00112
00113 }
00114 protocol->curNode = curnP;
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00126 #ifdef debug_output
00127 cout << "== end end tag!== " << endl;
00128 #endif
00129 }
00130
00131 void Protocol::startXMLTagNodeHandler(Node* node) {
00132
00133 }
00134
00135 void Protocol::endXMLTagNodeHandler(Node* node) {
00136
00137 }
00138
00139 void Protocol::handlNodes(Node* curNodeL, TagHandelrsMapT::iterator begin, TagHandelrsMapT::iterator end) {
00140 #ifdef debug_output
00141 cout << "handle_nodes" << endl;
00142 #endif
00143 TagHandelrsMapT::iterator it = begin;
00144 while (it != end) {
00145 (*it).second->handleTag(*curNodeL);
00146 it++;
00147 }
00148 }
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162 void Protocol::handlerIQ(Node* curNodeL) {
00163 IQHandelrsMapT::iterator it = iqHandlers.begin();
00164
00165 TagIQ iq(*curNodeL);
00166 while (it != iqHandlers.end()) {
00167 Node* subnode = NULL;
00168 if (it->xmlns != "") {
00169 subnode = iq.getKind(it->xmlns);
00170 if ((subnode != NULL) &&
00171 ((subnode->getName() == it->subtagName) || (it->subtagName == ""))) {
00172 it->handler->handleTag(iq);
00173 }
00174 } else
00175 if (it->subtagName != "") {
00176 _Kinds_iterator it1 = iq.kindsNameBegin(it->subtagName);
00177 _Kinds_iterator it2 = iq.kindsNameEnd(it->subtagName);
00178 if (it1 != it2) {
00179 it->handler->handleTag(iq);
00180 }
00181 } else {
00182 it->handler->handleTag(iq);
00183 }
00184
00185 ++it;
00186 }
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216 manadgeIQResp(iq);
00217
00218
00219
00220
00221
00222 }
00223
00224 void Protocol::handlerNode(Node* curNodeL) {
00225 TagHandelrsMapT::iterator it1;
00226 TagHandelrsMapT::iterator it2;
00227 #ifdef debug_output
00228 cout << "handle node name:" << curNodeL->getName() << " namespace: " << curNodeL->getNamesapace() << endl;
00229 #endif
00230
00231
00232 it1 = tagHandlers.lower_bound(make_pair(curNodeL->getName(), curNodeL->getNamesapace()));
00233 it2 = tagHandlers.upper_bound(make_pair(curNodeL->getName(), curNodeL->getNamesapace()));
00234 handlNodes(curNodeL, it1, it2);
00235
00236
00237 if (curNodeL->getNamesapace().length() > 0) {
00238 it1 = tagHandlers.lower_bound(make_pair(curNodeL->getName(), ""));
00239 it2 = tagHandlers.upper_bound(make_pair(curNodeL->getName(), ""));
00240 handlNodes(curNodeL, it1, it2);
00241 }
00242
00243
00244 it1 = tagHandlers.lower_bound(make_pair("", curNodeL->getNamesapace()));
00245 it2 = tagHandlers.upper_bound(make_pair("", curNodeL->getNamesapace()));
00246 handlNodes(curNodeL, it1, it2);
00247
00248
00249
00250 if (curNodeL->getName() == "iq") {
00251 handlerIQ(curNodeL);
00252 }
00253
00254
00255 pthread_mutex_lock(&cs_mutexNewNode);
00256 delete curNodeL;
00257 pthread_mutex_unlock(&cs_mutexNewNode);
00258 }
00259
00260 void startDocType(void *userData, const XML_Char *doctypeName, const XML_Char *sysid,
00261 const XML_Char *pubid, int has_internal_subset) {
00262 #ifdef debug_output
00263 cout << "start doc type" << endl;
00264 #endif
00265 }
00266
00267 void endDocType(void *userData) {
00268 #ifdef debug_output
00269 cout << "end doc type" << endl;
00270 #endif
00271 }
00272
00273 void Protocol::parserCreate(int sktID) {
00274 xml_parser = XML_ParserCreate("UTF-8");
00275 if (!xml_parser) {
00276 std::cerr << "Couldn't allocate memory for parser\n";
00277 exit(-1);
00278 }
00279 curDeph = 0;
00280 XML_SetUserData(xml_parser, this);
00281 XML_SetCharacterDataHandler(xml_parser, characterData);
00282 XML_SetElementHandler(xml_parser, startTag, endTag);
00283 XML_SetDoctypeDeclHandler(xml_parser, startDocType, endDocType);
00284 skt_id = sktID;
00285 }
00286
00287 void Protocol::startParsing() {
00288 int len;
00289
00290 do {
00291 #ifdef debug_output
00292 cout << "parse next buffer" << endl;
00293 #endif
00294 len = recv(skt_id, read_Buffer, BUFFSIZE, 0);
00295 if (len <= 0) {
00296 #ifdef debug_output
00297 cout << "1" << endl;
00298 #endif
00299 XML_Parse(xml_parser, read_Buffer, 0, 1);
00300 XML_ParserFree(xml_parser);
00301 break;
00302 } else {
00303 #ifdef debug_output
00304 cout << "2" << endl;
00305 #endif
00306 if (XML_Parse(xml_parser, read_Buffer, len, 0) == XML_STATUS_ERROR) {
00307 cout << " XML ERROR! :" <<
00308 XML_ErrorString(XML_GetErrorCode(xml_parser)) << "at line:" << XML_GetCurrentLineNumber(xml_parser) << endl;
00309 break;
00310 }
00311 }
00312 } while (len > 0);
00313
00314 #ifdef debug_output
00315 cout<<"Ждем все потоки, потоков:"<<num_run_threads<<endl;
00316 #endif
00317
00318
00319 pthread_mutex_lock(&mutDelEndNodRu);
00320
00321 while(num_run_threads!=0){pthread_cond_wait(&cond_del_element,&mutDelEndNodRu);}
00322 pthread_mutex_unlock(&mutDelEndNodRu);
00323
00324 #ifdef debug_output
00325 cout<<"Дождались все потоки"<<endl;
00326 #endif
00327
00328 pthread_mutex_lock(&cs_mutexNewNode);
00329 if (topNode!=NULL){
00330 #ifdef debug_output
00331 cout<<"OK"<<endl;
00332 #endif
00333 delete topNode;
00334
00335 }
00336 pthread_mutex_unlock(&cs_mutexNewNode);
00337 #ifdef debug_output
00338 cout<<"удалили корневую ноду"<<endl;
00339 #endif
00340
00341
00342 #ifdef debug_output
00343 cout << "end working client" << endl;
00344 #endif
00345 }
00346
00347 Protocol::Protocol() {
00348 #ifdef debug_output
00349 cout<<"конструктор протокола"<<endl;
00350 #endif
00351
00352 curDeph = 0;
00353 curNode = NULL;
00354 topNode=NULL;
00355 curNodeSend = NULL;
00356
00357
00358
00359
00360 pthread_mutex_init(&mutDelEndNodRu,NULL);
00361 pthread_cond_init(&cond_del_element,NULL);
00362 num_run_threads=0;
00363
00364 pthread_mutex_init(&cs_mutex,NULL);
00365 pthread_mutex_init(&cs_mutex1,NULL);
00366
00367
00368 pthread_mutex_init(&cs_mutexNewNode,NULL);
00369 pthread_mutex_init(&cs_mutexDEL,NULL);
00370 pthread_attr_init(&attr);
00371 pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
00372
00373
00374
00375
00376
00377 }
00378
00379
00380
00381 void Protocol::addTagHandler(TagHandler<Node>::PTagHandler handler, string name, string xmlns) {
00382 tagHandlers.insert(make_pair(make_pair(name, xmlns), handler));
00383 }
00384
00385
00386
00387
00388
00389
00390 void Protocol::removeIQHandler(TagHandler<TagIQ>::PTagHandler handler){
00391 pthread_mutex_lock(&cs_mutexDEL);
00392 IQHandelrsMapT::iterator it=iqHandlers.begin();
00393 while(it!=iqHandlers.end()){
00394 if(it->handler==handler){
00395 iqHandlers.erase(it);
00396 #ifdef debug_output
00397 cout<<"удалено"<<endl;
00398 #endif
00399 break;
00400 }
00401 ++it;
00402 }
00403
00404
00405
00406
00407 pthread_mutex_unlock(&cs_mutexDEL);
00408 }
00409
00410 void Protocol::addIQHandler(TagHandler<TagIQ>::PTagHandler handler, string xmlns, string name) {
00411 #ifdef debug_output
00412 cout << "add iq handler" << name << " " << xmlns << endl;
00413 #endif
00414 IQHandleD hd;
00415 hd.xmlns = xmlns;
00416 hd.subtagName = name;
00417 hd.handler = handler;
00418 pthread_mutex_lock(&cs_mutexDEL);
00419 iqHandlers.push_back(hd);
00420 pthread_mutex_unlock(&cs_mutexDEL);
00421
00422 }
00423
00424 void Protocol::sendString(string data) {
00425 pthread_mutex_lock(&cs_mutex);
00426 #ifdef debug_output
00427 cout << "send string" << endl << "DATA:" << data << endl;
00428 #endif
00429 const char* buf = data.data();
00430 int pos = 0;
00431 int err = 0;
00432 int l = data.length();
00433 while (pos < l) {
00434 err = send(skt_id, buf + pos, l - pos, 0);
00435 if (err > 0) {
00436 #ifdef debug_output
00437 cout << "A OK";
00438 #endif
00439 pos += err;
00440 } else {
00441 cout << "A ERR";
00442 break;
00443 }
00444 }
00445 pthread_mutex_unlock(&cs_mutex);
00446 }
00447
00448
00449 void Protocol ::sendNode(Node &stanza) {
00450 sendString(stanza.toXMLString());
00451 }
00452
00453 void Protocol::sendNodeStart(Node &stanza) {
00454 sendString(stanza.toXMLStringStart());
00455 }
00456
00457 void Protocol::sendNodeEnd(Node &stanza) {
00458 sendString(stanza.toXMLStringEnd());
00459 }
00460
00461 void Protocol::manadgeIQResp(TagIQ& st) {
00462
00463
00464
00465
00466
00467
00468 pthread_mutex_lock(&cs_mutex1);
00469 pair<multimap<string, IQRespD>::iterator, multimap<string, IQRespD>::iterator> it = req_id.equal_range(st.getID());
00470 #ifdef debug_output
00471 cout << "respCount" << req_id.size() << endl;
00472 #endif
00473 while (it.first != it.second) {
00474 #ifdef debug_output
00475 cout << "iq from get F:" << st.getFrom() << endl;
00476 #endif
00477 IQRespD* rd = &(it.first->second);
00478 Node *n = st.getKind(rd->xmlns);
00479 if ((n != NULL) && (n->getName() == rd->subtagName)) {
00480 rd->result = st;
00481 struct sembuf sops;
00482 memset(&sops, 0, sizeof sops);
00483 sops.sem_op = 1;
00484 #ifdef debug_output
00485 cout << "===opening semaphore!!!===" << endl;
00486 #endif
00487
00488
00489 int sem_id = rd->semID;
00490
00491 rd->semID = -1;
00492 semop(sem_id, &sops, 1);
00493 semctl(sem_id, 0, IPC_RMID);
00494 }
00495 ++it.first;
00496 }
00497 pthread_mutex_unlock(&cs_mutex1);
00498 }
00499
00500 TagIQ Protocol::sendIQResp(TagIQ& st, string resultXMLNS, string resultSubTagName) {
00501
00502 struct sembuf sops;
00503 int cursem;
00504 struct timespec timeout;
00505
00506 memset(&sops, 0, sizeof sops);
00507 memset(&timeout, 0, sizeof timeout);
00508
00509 cursem = semget(IPC_PRIVATE, 1, 0777 | IPC_CREAT);
00510 #ifdef debug_output
00511 cout << "SEMAPHORE IDENTIFIER!: " << cursem << " errNo:" << errno << endl;
00512 #endif
00513
00514 sops.sem_op = -1;
00515 timeout.tv_sec = 600;
00516
00519
00520 pthread_mutex_lock(&cs_mutex1);
00521 string key = st.getID();
00522 IQRespD rd;
00523 rd.semID = -1;
00524 multimap<string, IQRespD>::iterator it = req_id.insert(make_pair(key, rd));
00525
00526 it->second.semID = cursem;
00527 it->second.subtagName = resultSubTagName;
00528 it->second.xmlns = resultXMLNS;
00529 pthread_mutex_unlock(&cs_mutex1);
00530
00531
00532
00533 sendNode(st);
00534
00535
00536
00537 semtimedop(cursem, &sops, 1, &timeout);
00538
00540
00541 pthread_mutex_lock(&cs_mutex1);
00542 TagIQ res(it->second.result);
00543 req_id.erase(it);
00544 pthread_mutex_unlock(&cs_mutex1);
00546 return res;
00547 }
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573 Protocol::~Protocol() {
00574
00575
00576
00577
00578 pthread_mutex_destroy(&mutDelEndNodRu);
00579 pthread_cond_destroy(&cond_del_element);
00580
00581
00582 pthread_attr_destroy(&attr);
00584 if(curNodeSend!=NULL){
00585 cout<<"delete curnodesend!"<<endl;
00586 delete curNodeSend;
00587 }
00588 cout<<"деструктор протокола"<<endl;
00589
00590 pthread_mutex_destroy(&cs_mutex);
00591 pthread_mutex_destroy(&cs_mutex1);
00592 pthread_mutex_destroy(&cs_mutexNewNode);
00593 pthread_mutex_destroy(&cs_mutexDEL);
00594 }
00595
00596 IQRespD::~IQRespD() {
00597 if (semID != -1) {
00598 semctl(semID, 0, IPC_RMID);
00599 }
00600 }