56#define DEBUGXQ(x) DEBUG(rID<<sessN<<rspstID[urState]<<reqstID[myState]<<x)
58#define DUMPIT(x,y) XrdSsiUtils::b2x(x,y,hexBuff,sizeof(hexBuff),dotBuff)<<dotBuff
81 {
" [new",
" [begun",
" [bound",
86 {
" wtReq] ",
" xqReq] ",
" wtRsp] ",
87 " doRsp] ",
" odRsp] ",
" erRsp] "
97class FinalizeJob :
public XrdJob
101void DoIt() {reqP->Finalize();
102 fileP->DeferredFinalizeDone(reqP, reqID);
106 FinalizeJob(XrdSsiFileReq *rP, XrdSsiFileSess *fP,
unsigned int id) :
107 reqP(rP), fileP(fP), reqID(id) {}
112XrdSsiFileSess *fileP;
123int XrdSsiFileReq::freeCnt = 0;
124int XrdSsiFileReq::freeMax = 256;
136 DEBUGXQ((oP ?
"oucbuff" :
"sfsbuff") <<
" rqsz=" <<rSz);
140 Stats.statsMutex.Lock();
142 Stats.ReqBytes += rSz;
143 if (rSz >
Stats.ReqMaxsz)
Stats.ReqMaxsz = rSz;
144 Stats.statsMutex.UnLock();
170 DEBUGXQ(msgLen <<
" byte alert presented wtr=" <<respWait);
182 if (msgLen <= 0 || haveResp || isEnding)
198 {alrtLast->next = aP;
201 alrtPend = alrtPend->
next;
205 if (alrtLast) alrtLast->next = aP;
233 freeReq = nP->nextReq;
249 snprintf(nP->rID,
sizeof(nP->rID),
"%u:", rnum);
263void XrdSsiFileReq::BindDone()
269 DEBUGXQ(
"Bind called; for request " <<reqID);
283 case isDone:
if (!schedDone)
295 Log.
Emsg(epname, tident,
"Invalid req/rsp state; giving up on object!");
304 Sched->Schedule(
new FinalizeJob(
this, fileP, reqID));
311void XrdSsiFileReq::Dispose()
317 DEBUGXQ(
"Recycling request...");
343 DEBUGXQ(
"Calling service processor");
357 DEBUGXQ(
"Calling Finished(" <<cancel <<
')');
358 if (respWait) WakeUp();
359 if (finWait) finWait->Post();
373 Log.Emsg(epname, tident,
"Invalid req/rsp state; giving up on object!");
390 if (eiP != fileP->errInfo())
delete eiP;
395 if (myState ==
odRsp)
396 {
DEBUGXQ(
"resp sent; no additional data remains");
397 if (!fileP->DeferFinalize(
this,reqID))
Finalize();
403 DEBUGXQ(
"wtrsp sent; resp " <<(haveResp ?
"here" :
"pend"));
410 if (!haveResp) respWait =
true;
418int XrdSsiFileReq::Emsg(
const char *pfx,
430 if (ecode < 0) ecode = -ecode;
442 if (cbInfo) cbInfo->
setErrInfo(ecode, buffer);
448int XrdSsiFileReq::Emsg(
const char *pfx,
463 if (eNum <= 0) eNum = EFAULT;
468 snprintf(buffer,
sizeof(buffer),
"Unable to %s %s; %s", op, sessN,
eMsg);
476 if (cbInfo) cbInfo->setErrInfo(eNum, buffer);
488 bool cancel = (myState !=
odRsp);
493 if (alrtSent || alrtPend)
495 if (aP) aP->
next = alrtPend;
499 mHelper.
Lock(frqMutex);
511 DEBUGXQ(
"Aborting request processing");
530 if (strBuff) {strBuff->Recycle(); strBuff = 0;}
531 DEBUGXQ(
"Calling Finished(" <<cancel <<
')');
532 if (respWait) WakeUp();
544 case isDone: sessN =
"bad";
553 Log.Emsg(epname, tident,
"Invalid req/rsp state; giving up on object!");
572 if (oucBuff)
return oucBuff->Data();
580void XrdSsiFileReq::Init(
const char *cID)
582 tident = (cID ? strdup(cID) : strdup(
"???"));
620 EPNAME(
"ProcessResponse");
624 DEBUGXQ(
"Response presented wtr=" <<respWait);
636 DEBUGXQ(
"Resp data sz="<<Resp.blen);
641 DEBUGXQ(
"Resp err rc="<<Resp.eNum<<
" msg="<<Resp.eMsg);
646 DEBUGXQ(
"Resp file fd="<<Resp.fdnum<<
" sz="<<Resp.fsize);
666 if (respWait) WakeUp();
688 static const char *epname =
"read";
694 if (myState !=
doRsp)
696 return (myState ==
odRsp ? 0 : Emsg(epname, ENOMSG,
"read"));
703 if (respLen <= 0) {done =
true; myState =
odRsp;
return 0;}
705 {memcpy(buff, Resp->buff+respOff, respLen);
706 blen = respLen; myState =
odRsp; done =
true;
708 memcpy(buff, Resp->buff+respOff, blen);
709 respLen -= blen; respOff += blen;
714 cbInfo->setErrInfo(Resp->eNum, Resp->eMsg);
715 myState =
odRsp; done =
true;
719 if (fileSz <= 0) {done =
true; myState =
odRsp;
return 0;}
720 nbytes =
pread(Resp->fdnum, buff, blen, respOff);
723 if (!nbytes) {myState =
odRsp;
return 0;}
725 return Emsg(epname, errno,
"read");
727 respOff += nbytes; fileSz -= nbytes;
732 readStrmA(Resp->strmP, buff, blen)
733 : readStrmP(Resp->strmP, buff, blen));
734 done = strmEOF && strBuff == 0;
744 return Emsg(epname, EFAULT,
"read");
754 static const char *epname =
"readStrmA";
763 {memcpy(buff, strBuff->
data+respOff, blen);
764 respLen -= blen; respOff += blen;
767 memcpy(buff, strBuff->data+respOff, respLen);
769 strBuff->Recycle(); strBuff = 0;
770 blen -= respLen; buff += respLen;
773 if (!strmEOF && blen)
774 {respLen = blen; respOff = 0;
775 strBuff = strmP->
GetBuff(eObj, respLen, strmEOF);
781 if (strmEOF) {myState =
odRsp;
return xlen;}
782 else if (!blen)
return xlen;
786 myState =
erRsp; strmEOF =
true;
787 return Emsg(epname, eObj,
"read stream");
797 static const char *epname =
"readStrmP";
804 while(!strmEOF && (dlen = strmP->
SetBuff(eObj, buff, blen, strmEOF)) > 0)
806 if (dlen == blen)
return xlen;
807 if (dlen > blen) {eObj.
Set(0,EOVERFLOW);
break;}
808 buff += dlen; blen -= dlen;
813 if (strmEOF || !dlen) {myState =
odRsp; strmEOF =
true;
return xlen;}
817 myState =
erRsp; strmEOF =
true;
818 return Emsg(epname, eObj,
"read stream");
825void XrdSsiFileReq::Recycle()
831 if (oucBuff) {oucBuff->Recycle(); oucBuff = 0;}
839 if (tident) {free(tident); tident = 0;}
840 if (freeCnt >= freeMax) {aqMutex.UnLock();
delete this;}
865 if (oucBuff) {oucBuff->Recycle(); oucBuff = 0;}
876 static const char *epname =
"send";
884 if (myState !=
doRsp)
return 1;
891 {sfVec[1].buffer = (
char *)Resp->buff+respOff;
894 {blen = respLen; myState =
odRsp;
896 respLen -= blen; respOff += blen;
905 {sfVec[1].offset = respOff; sfVec[1].
fdnum = Resp->fdnum;
907 {blen = fileSz; myState =
odRsp;}
908 respOff += blen; fileSz -= blen;
913 return sendStrmA(Resp->strmP, sfDio, blen);
915 default: myState =
erRsp;
916 return Emsg(epname, EFAULT,
"send");
922 if (!blen) {sfVec[1].buffer = rID; myState =
odRsp;}
928 if (!rc)
return myState !=
odRsp;
932 rc = (rc < 0 ? EIO : EFAULT);
934 return Emsg(epname, rc,
"send");
944 static const char *epname =
"sendStrmA";
953 if (strmEOF || !(strBuff = strmP->
GetBuff(eObj, respLen, strmEOF)))
954 {myState =
odRsp; strmEOF =
true;
955 if (!strmEOF) Emsg(epname, eObj,
"read stream");
963 sfVec[1].buffer = strBuff->data+respOff;
967 respLen -= blen; respOff += blen;
969 sfVec[1].
sendsz = respLen;
979 if (strBuff && !respLen) {strBuff->Recycle(); strBuff = 0;}
983 if (!rc)
return myState !=
odRsp;
987 rc = (rc < 0 ? EIO : EFAULT);
988 myState =
erRsp; strmEOF =
true;
989 return Emsg(epname, rc,
"send");
1005 if (alrtSent) {alrtSent->Recycle(); alrtSent = 0;}
1009 frqMon.
Lock(frqMutex);
1016 {
char hexBuff[16], binBuff[8], dotBuff[4];
1017 alrtSent = alrtPend;
1018 if (!(alrtPend = alrtPend->next)) alrtLast = 0;
1019 int n = alrtSent->SetInfo(eInfo, binBuff,
sizeof(binBuff));
1021 DEBUGXQ(n <<
" byte alert (0x" <<
DUMPIT(binBuff, n) <<
") sent; "
1022 <<(alrtPend ?
"" :
"no ") <<
"more pending");
1031 if (fileP->AttnInfo(eInfo, rspP, reqID)) myState =
odRsp;
1043 respCB = eInfo.
getErrCB(respCBarg);
1070 {
char hexBuff[16], binBuff[8], dotBuff[4];
1071 int n = aP->
SetInfo(*wuInfo, binBuff,
sizeof(binBuff));
1073 DEBUGXQ(n <<
" byte alert (0x" <<
DUMPIT(binBuff, n) <<
") sent; "
1074 <<(alrtPend ?
"" :
"no ") <<
"more pending");
1076 if (fileP->AttnInfo(*wuInfo, rspP, reqID))
1083 respCB->Done(respCode, wuInfo, sessN);
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define pread(a, b, c, d)
class XrdBuffer * XrdSfsXioHandle
XrdJob(const char *desc="")
XrdOucEICB()
Constructor and destructor.
static int Format(char *buff, int blen, int ecode, const char *etxt1, const char *etxt2=0)
void setErrArg(unsigned long long cbarg=0)
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
int setErrInfo(int code, const char *emsg)
virtual int SendFile(int fildes)=0
static void Reclaim(XrdSfsXioHandle theHand)
static char * Buffer(XrdSfsXioHandle theHand, int *buffsz=0)
static XrdSsiAlert * Alloc(XrdSsiRespInfoMsg &aMsg)
int SetInfo(XrdOucErrInfo &eInfo, char *aMsg, int aLen)
void Set(const char *eMsg=0, int eNum=0, int eArg=0)
const std::string & Get(int &eNum) const
void Alert(XrdSsiRespInfoMsg &aMsg)
Send or receive a server generated alert.
bool WantResponse(XrdOucErrInfo &eInfo)
XrdSfsXferSize Read(bool &done, char *buffer, XrdSfsXferSize blen)
char * GetRequest(int &rLen)
void Finished(XrdSsiRequest &rqstR, const XrdSsiRespInfo &rInfo, bool cancel=false)
bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &resp)
XrdSsiFileReq(const char *cID=0)
int Send(XrdSfsDio *sfDio, XrdSfsXferSize size)
static XrdSsiFileReq * Alloc(XrdOucErrInfo *eP, XrdSsiFileResource *rP, XrdSsiFileSess *fP, const char *sn, const char *id, unsigned int rnum)
void Activate(XrdOucBuffer *oP, XrdSfsXioHandle bR, int rSz)
void Done(int &Result, XrdOucErrInfo *cbInfo, const char *path=0)
void Lock(XrdSsiMutex *mutex)
static void SetMutex(XrdSsiRequest *rP, XrdSsiMutex *mP)
static void onServer(XrdSsiRequest *rP)
static XrdSsiRespInfo * RespP(XrdSsiRequest *rP)
static void CleanUp(XrdSsiRequest &reqR)
XrdSsiRequest(const char *reqid=0, uint16_t tmo=0)
virtual void RecycleMsg(bool sent=true)=0
virtual bool SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen)
virtual Buffer * GetBuff(XrdSsiErrInfo &eRef, int &dlen, bool &last)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
int fdnum
File descriptor for data.
int sendsz
Length of data at offset.