XRootD
Loading...
Searching...
No Matches
XrdBwmHandle.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d B w m H a n d l e . c c */
4/* */
5/* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstring>
33
36#include "XrdBwm/XrdBwmTrace.hh"
38#include "XrdSys/XrdSysError.hh"
40
42
43/******************************************************************************/
44/* S t a t i c O b j e c t s */
45/******************************************************************************/
46
47XrdBwmLogger *XrdBwmHandle::Logger = 0;
48XrdBwmPolicy *XrdBwmHandle::Policy = 0;
49XrdBwmHandle *XrdBwmHandle::Free = 0;
50unsigned int XrdBwmHandle::numQueued = 0;
51
53
54/******************************************************************************/
55/* L o c a l C l a s s e s */
56/******************************************************************************/
57
59{
60public:
61
62static
64 {XrdBwmHandleCB *mP;
65 xMutex.Lock();
66 if (!(mP = Free)) mP = new XrdBwmHandleCB;
67 else Free = mP->Next;
68 xMutex.UnLock();
69 return mP;
70 }
71
72void Done(int &Results, XrdOucErrInfo *eInfo, const char *Path=0)
73 {xMutex.Lock();
74 Next = Free;
75 Free = this;
76 xMutex.UnLock();
77 }
78
79int Same(unsigned long long arg1, unsigned long long arg2) {return 0;}
80
81 XrdBwmHandleCB() : Next(0) {}
83
84private:
85 XrdBwmHandleCB *Next;
86static XrdSysMutex xMutex;
87static XrdBwmHandleCB *Free;
88};
89
90XrdSysMutex XrdBwmHandleCB::xMutex;
91XrdBwmHandleCB *XrdBwmHandleCB::Free = 0;
92
93/******************************************************************************/
94/* E x t e r n a l L i n k a g e s */
95/******************************************************************************/
96
97void *XrdBwmHanXeq(void *pp)
98{
100}
101
102/******************************************************************************/
103/* c l a s s X r d B w m H a n d l e */
104/******************************************************************************/
105/******************************************************************************/
106/* A c t i v a t e */
107/******************************************************************************/
108
109#define tident Parms.Tident
110
112{
113 EPNAME("Activate");
114 XrdSysMutexHelper myHelper(hMutex);
115 char *rBuff;
116 int rSize, rc;
117
118// Check the status of this request.
119//
120 if (Status != Idle)
121 {if (Status == Scheduled)
122 einfo.setErrInfo(kXR_inProgress, "Request already scheduled.");
123 else einfo.setErrInfo(kXR_InvalidRequest, "Visa already issued.");
124 return SFS_ERROR;
125 }
126
127// Try to schedule this request.
128//
129 qTime = time(0);
130 rBuff = einfo.getMsgBuff(rSize);
131 if (!(rc = Policy->Schedule(rBuff, rSize, Parms))) return SFS_ERROR;
132
133// If resource immediately available, let client run
134//
135 if (rc > 0)
136 {rHandle = rc;
138 rTime = time(0);
139 ZTRACE(sched,"Run " <<Parms.Lfn <<' ' <<Parms.LclNode
140 <<(Parms.Direction==XrdBwmPolicy::Incoming?" <- ":" -> ")
141 <<Parms.RmtNode);
142 einfo.setErrCode(strlen(rBuff));
143 return (*rBuff ? SFS_DATA : SFS_OK);
144 }
145
146// Request was queued. We need to hold on to this so we can issue an async
147// response later when the resource becomes available.
148//
149 rHandle = -rc;
150 ErrCB = einfo.getErrCB(ErrCBarg);
151 einfo.setErrCB((XrdOucEICB *)&myEICB);
153 refHandle(rHandle, this);
154 ZTRACE(sched, "inQ " <<Parms.Lfn <<' ' <<Parms.LclNode
155 <<(Parms.Direction==XrdBwmPolicy::Incoming?" <- ":" -> ")
156 <<Parms.RmtNode);
157
158// Indicate that client needs to wait
159//
160 return SFS_STARTED;
161}
162#undef tident
163
164/******************************************************************************/
165/* static public A l l o c # 1 */
166/******************************************************************************/
167
168XrdBwmHandle *XrdBwmHandle::Alloc(const char *theUsr, const char *thePath,
169 const char *LclNode, const char *RmtNode,
170 int Incoming)
171{
172 XrdBwmHandle *hP = Alloc();
173
174// Initialize the hanlde
175//
176 if (hP)
177 {hP->Parms.Tident = theUsr; // Always available
178 hP->Parms.Lfn = strdup(thePath);
179 hP->Parms.LclNode = strdup(LclNode);
180 hP->Parms.RmtNode = strdup(RmtNode);
181 hP->Parms.Direction = (Incoming ? XrdBwmPolicy::Incoming
183 hP->Status = Idle;
184 hP->qTime = 0;
185 hP->rTime = 0;
186 hP->xSize = 0;
187 hP->xTime = 0;
188 }
189
190// All done
191//
192 return hP;
193}
194
195/******************************************************************************/
196/* private A l l o c # 2 */
197/******************************************************************************/
198
200{
201 static XrdSysMutex aMutex;
202 constexpr int minAlloc = 4096/sizeof(XrdBwmHandle);
203 XrdBwmHandle *hP = nullptr;
204
205// No handle currently in the table. Get a new one off the free list or
206// return one to the free list.
207//
208 aMutex.Lock();
209 if (old_hP) {old_hP->Next = Free; Free = old_hP; hP = 0;}
210 else {if (!Free)
211 if ((hP = new XrdBwmHandle[minAlloc]()))
212 {int i = minAlloc; while(i--) {hP->Next = Free; Free = hP; hP++;}}
213 if ((hP = Free)) Free = hP->Next;
214 }
215 aMutex.UnLock();
216
217 return hP;
218}
219
220/******************************************************************************/
221/* D i s p a t c h */
222/******************************************************************************/
223
224#define tident hP->Parms.Tident
225
227{
228 EPNAME("Dispatch");
230 XrdBwmHandle *hP;
231 char *RespBuff;
232 int RespSize, readyH, Result, Err;
233
234// Dispatch ready requests in an endless loop
235//
236 do {
237
238// Setup buffer
239//
240 RespBuff = erP->getMsgBuff(RespSize);
241 *RespBuff = '\0';
242 erP->setErrCode(0);
243
244// Get next ready request and test if it ended with an error
245//
246 if ((Err = (readyH = Policy->Dispatch(RespBuff, RespSize)) < 0))
247 readyH = -readyH;
248
249// Find the matching handle
250//
251 if (!(hP = refHandle(readyH)))
252 {sprintf(RespBuff, "%d", readyH);
253 BwmEroute.Emsg("Dispatch", "Lost handle from", RespBuff);
254 if (!Err) Policy->Done(readyH);
255 continue;
256 }
257
258// Lock the handle and make sure it can be dispatched
259//
260 hP->hMutex.Lock();
261 if (hP->Status != Scheduled)
262 {BwmEroute.Emsg("Dispatch", "ref to unscheduled handle",
263 hP->Parms.Tident, hP->Parms.Lfn);
264 if (!Err) Policy->Done(readyH);
265 } else {
266 hP->myEICB.Wait(); hP->rTime = time(0);
267 erP->setErrCB((XrdOucEICB *)erP, hP->ErrCBarg);
268 if (Err) {hP->Status = Idle; Result = SFS_ERROR;}
269 else {hP->Status = Dispatched;
270 erP->setErrCode(strlen(RespBuff));
271 Result = (*RespBuff ? SFS_DATA : SFS_OK);
272 }
273 ZTRACE(sched,(Err?"Err ":"Run ") <<hP->Parms.Lfn <<' ' <<hP->Parms.LclNode
274 <<(hP->Parms.Direction == XrdBwmPolicy::Incoming ? " <- ":" -> ")
275 <<hP->Parms.RmtNode);
276 hP->ErrCB->Done(Result, (XrdOucErrInfo *)erP);
277 erP = XrdBwmHandleCB::Alloc();
278 }
279 hP->hMutex.UnLock();
280 } while(1);
281
282// Keep the compiler happy
283//
284 return (void *)0;
285}
286
287#undef tident
288
289/******************************************************************************/
290/* private r e f H a n d l e */
291/******************************************************************************/
292
293XrdBwmHandle *XrdBwmHandle::refHandle(int refID, XrdBwmHandle *hP)
294{
295 static XrdSysMutex tMutex;
296 static struct {XrdBwmHandle *First;
297 XrdBwmHandle *Last;
298 } hTab[256] = {{0,0}};
299 XrdBwmHandle *pP = 0;
300 int i = refID % 256;
301
302// If we have a handle passed, add the handle to the table
303//
304 tMutex.Lock();
305 if (hP)
306 {hP->Next = 0;
307 if (hTab[i].Last) {hTab[i].Last->Next = hP; hTab[i].Last = hP;}
308 else {hTab[i].First = hTab[i].Last = hP; hP->Next = 0;}
309 numQueued++;
310 } else {
311 hP = hTab[i].First;
312 while(hP && hP->rHandle != refID) {pP = hP; hP = hP->Next;}
313 if (hP)
314 {if (pP) pP->Next = hP->Next;
315 else hTab[i].First = hP->Next;
316 if (hTab[i].Last == hP) hTab[i].Last = pP;
317 numQueued--;
318 }
319 }
320 tMutex.UnLock();
321
322// All done.
323//
324 return hP;
325}
326
327/******************************************************************************/
328/* public R e t i r e */
329/******************************************************************************/
330
331// The handle must be locked upon entry! It is unlocked upon exit.
332
334{
335 XrdSysMutexHelper myHelper(hMutex);
336
337// Get the global lock as the links field can only be manipulated with it.
338// If not idle, cancel the resource. If scheduled, remove it from the table.
339//
340 if (Status != Idle)
341 {Policy->Done(rHandle);
342 if (Status == Scheduled && !refHandle(rHandle, this))
343 BwmEroute.Emsg("Retire", "Lost handle to", Parms.Tident, Parms.Lfn);
344 Status = Idle; rHandle = 0;
345 }
346
347// If we have a logger, then log this event
348//
349 if (Logger && qTime)
350 {XrdBwmLogger::Info myInfo;
351 myInfo.Tident = Parms.Tident;
352 myInfo.Lfn = Parms.Lfn;
353 myInfo.lclNode = Parms.LclNode;
354 myInfo.rmtNode = Parms.RmtNode;
355 myInfo.ATime = qTime;
356 myInfo.BTime = rTime;
357 myInfo.CTime = time(0);
358 myInfo.Size = xSize;
359 myInfo.ESec = xTime;
360 myInfo.Flow = (Parms.Direction == XrdBwmPolicy::Incoming ? 'I':'O');
361 Policy->Status(myInfo.numqIn, myInfo.numqOut, myInfo.numqXeq);
362 Logger->Event(myInfo);
363 }
364
365// Free storage appendages and recycle handle
366//
367 if (Parms.Lfn) {free(Parms.Lfn); Parms.Lfn = 0;}
368 if (Parms.LclNode) {free(Parms.LclNode); Parms.LclNode = 0;}
369 if (Parms.RmtNode) {free(Parms.RmtNode); Parms.RmtNode = 0;}
370 Alloc(this);
371}
372
373/******************************************************************************/
374/* s e t P o l i c y */
375/******************************************************************************/
376
378{
379 pthread_t tid;
380 int rc, startThread = (Policy == 0);
381
382// Set the policy and then start a thread to do dispatching if we have none
383//
384 Policy = pP;
385 if (startThread)
386 if ((rc = XrdSysThread::Run(&tid, XrdBwmHanXeq, (void *)0,
387 0, "Handle Dispatcher")))
388 {BwmEroute.Emsg("setPolicy", rc, "create handle dispatch thread");
389 return 1;
390 }
391
392// All done
393//
394 Logger = lP;
395 return 0;
396}
@ kXR_InvalidRequest
Definition XProtocol.hh:996
@ kXR_inProgress
void * XrdBwmHanXeq(void *pp)
#define EPNAME(x)
#define ZTRACE(act, x)
XrdSysError BwmEroute(0)
Definition XrdBwm.cc:69
#define Err(p, a, b, c)
XrdOucString Path
#define SFS_DATA
#define SFS_ERROR
#define SFS_STARTED
#define SFS_OK
static XrdBwmHandleCB * Alloc()
void Done(int &Results, XrdOucErrInfo *eInfo, const char *Path=0)
int Same(unsigned long long arg1, unsigned long long arg2)
static void * Dispatch()
int Activate(XrdOucErrInfo &einfo)
HandleState Status
static XrdBwmHandle * Alloc(const char *theUsr, const char *thePath, const char *lclNode, const char *rmtNode, int Incoming)
static int setPolicy(XrdBwmPolicy *pP, XrdBwmLogger *lP)
const char * rmtNode
const char * lclNode
const char * Tident
virtual void Done(int &Result, XrdOucErrInfo *eInfo, const char *Path=0)=0
XrdOucEICB()
Constructor and destructor.
XrdOucEICB * getErrCB()
char * getMsgBuff(int &mblen)
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
int setErrInfo(int code, const char *emsg)
XrdOucErrInfo(const char *user=0, XrdOucEICB *cb=0, unsigned long long ca=0, int mid=0, int uc=0)
int setErrCode(int code)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)