XRootD
Loading...
Searching...
No Matches
XrdThrottleManager.hh
Go to the documentation of this file.
1
2/*
3 * XrdThrottleManager
4 *
5 * This class provides an implementation of a throttle manager.
6 * The throttled manager purposely pause if the bandwidth, IOPS
7 * rate, or number of outstanding IO requests is sustained above
8 * a certain level.
9 *
10 * The XrdThrottleManager is user-aware and provides fairshare.
11 *
12 * This works by having a separate thread periodically refilling
13 * each user's shares.
14 *
15 * Note that we do not actually keep close track of users, but rather
16 * put them into a hash. This way, we can pretend there's a constant
17 * number of users and use a lock-free algorithm.
18 */
19
20#ifndef __XrdThrottleManager_hh_
21#define __XrdThrottleManager_hh_
22
23#ifdef __GNUC__
24#define likely(x) __builtin_expect(!!(x), 1)
25#define unlikely(x) __builtin_expect(!!(x), 0)
26#else
27#define likely(x) x
28#define unlikely(x) x
29#endif
30
31#include <array>
32#include <ctime>
33#include <condition_variable>
34#include <memory>
35#include <mutex>
36#include <string>
37#include <unordered_map>
38#include <vector>
39
42
43class XrdSecEntity;
44class XrdSysError;
45class XrdOucTrace;
48
49namespace XrdThrottle {
50 class Configuration;
51}
52
54{
55
56friend class XrdThrottleTimer;
57
58public:
59
60void Init();
61
62bool OpenFile(const std::string &entity, std::string &open_error_message);
63bool CloseFile(const std::string &entity);
64
65void Apply(int reqsize, int reqops, int uid);
66
68
69bool IsThrottling() {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}
70
71// Returns the user name and UID for the given client.
72//
73// The UID is a hash of the user name; it is not guaranteed to be unique.
74std::tuple<std::string, uint16_t> GetUserInfo(const XrdSecEntity *client);
75
76void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
77 {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
78 m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}
79
80void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
81 {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}
82
83void SetMaxOpen(unsigned long max_open) {m_max_open = max_open;}
84
85void SetMaxConns(unsigned long max_conns) {m_max_conns = max_conns;}
86
87void SetMaxWait(unsigned long max_wait) {m_max_wait_time = std::chrono::seconds(max_wait);}
88
89void SetMonitor(XrdXrootdGStream *gstream) {m_gstream = gstream;}
90
91//int Stats(char *buff, int blen, int do_sync=0) {return m_pool.Stats(buff, blen, do_sync);}
92
93// Notify that an I/O operation has started for a given user.
94//
95// If we are at the maximum concurrency limit then this will block;
96// if we block for too long, the second return value will return false.
97XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok);
98
99void PrepLoadShed(const char *opaque, std::string &lsOpaque);
100
101bool CheckLoadShed(const std::string &opaque);
102
103void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port);
104
106
107 ~XrdThrottleManager() {} // The buffmanager is never deleted
108
109protected:
110
111// Notify the manager an I/O operation has completed for a given user.
112// This is used to update the I/O wait time for the user and, potentially,
113// wake up a waiting thread.
114void StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid);
115
116private:
117
118// Determine the UID for a given user name.
119// This is a hash of the username; it is not guaranteed to be unique.
120// The UID is used to index into the waiters array and cannot be more than m_max_users.
121uint16_t GetUid(const std::string &);
122
123void Recompute();
124
125void RecomputeInternal();
126
127static
128void * RecomputeBootstrap(void *pp);
129
130// Compute the order of wakeups for the existing waiters.
131void ComputeWaiterOrder();
132
133// Walk through the outstanding IO operations and compute the per-user
134// IO time.
135//
136// Meant to be done periodically as part of the Recompute interval. Used
137// to make sure we have a better estimate of the concurrency for each user.
138void UserIOAccounting();
139
140int WaitForShares();
141
142void GetShares(int &shares, int &request);
143
144void StealShares(int uid, int &reqsize, int &reqops);
145
146// Return the timer hash list ID to use for the current request.
147//
148// When on Linux, this will hash across the CPU ID; the goal is to distribute
149// the different timers across several lists to avoid mutex contention.
150static unsigned GetTimerListHash();
151
152// Notify a single waiter thread that it can proceed.
153void NotifyOne();
154
155XrdOucTrace * m_trace;
156XrdSysError * m_log;
157
158XrdSysCondVar m_compute_var;
159
160// Controls for the various rates.
161float m_interval_length_seconds;
162float m_bytes_per_second;
163float m_ops_per_second;
164int m_concurrency_limit;
165
166// Maintain the shares
167
168static constexpr int m_max_users = 1024; // Maximum number of users we can have; used for various fixed-size arrays.
169std::vector<int> m_primary_bytes_shares;
170std::vector<int> m_secondary_bytes_shares;
171std::vector<int> m_primary_ops_shares;
172std::vector<int> m_secondary_ops_shares;
173int m_last_round_allocation;
174
175// Waiter counts for each user
176struct alignas(64) Waiter
177{
178 std::condition_variable m_cv; // Condition variable for waiters of this user.
179 std::mutex m_mutex; // Mutex for this structure
180 unsigned m_waiting{0}; // Number of waiting operations for this user.
181
182 // EWMA of the concurrency for this user. This is used to determine how much
183 // above / below the user's concurrency share they've been recently. This subsequently
184 // will affect the likelihood of being woken up.
185 XrdSys::RAtomic<float> m_concurrency{0};
186
187 // I/O time for this user since the last recompute interval. The value is used
188 // to compute the EWMA of the concurrency (m_concurrency).
189 XrdSys::RAtomic<std::chrono::steady_clock::duration::rep> m_io_time{0};
190
191 // Pointer to the XrdThrottleManager object that owns this waiter.
192 XrdThrottleManager *m_manager{nullptr};
193
194 // Causes the current thread to wait until it's the user's turn to wake up.
195 bool Wait();
196
197 // Wakes up one I/O operation for this user.
198 void NotifyOne(std::unique_lock<std::mutex> lock)
199 {
200 m_cv.notify_one();
201 }
202};
203std::array<Waiter, m_max_users> m_waiter_info;
204
205// Array with the wake up ordering of the waiter users.
206// Every recompute interval, we compute how much over the concurrency limit
207// each user is, quantize this to an integer number of shares and then set the
208// array value to the user ID (so if user ID 5 has two shares, then there are two
209// entries with value 5 in the array). The array is then shuffled to randomize the
210// order of the wakeup.
211//
212// All reads and writes to the wake order array are meant to be relaxed atomics; if a thread
213// has an outdated view of the array, it simply means that a given user might get slightly
214// incorrect random probability of being woken up. That's seen as acceptable to keep
215// the selection algorithm lock and fence-free.
216std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_0;
217std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_1; // A second wake order array; every recompute interval, we will swap the active array, avoiding locks.
218XrdSys::RAtomic<char> m_wake_order_active; // The current active wake order array; 0 or 1
219std::atomic<size_t> m_waiter_offset{0}; // Offset inside the wake order array; this is used to wake up the next potential user in line. Cannot be relaxed atomic as offsets need to be seen in order.
220std::chrono::steady_clock::time_point m_last_waiter_recompute_time; // Last time we recomputed the wait ordering.
221XrdSys::RAtomic<unsigned> m_waiting_users{0}; // Number of users waiting behind the throttle as of the last recompute time.
222
223std::atomic<uint32_t> m_io_active; // Count of in-progress IO operations: cannot be a relaxed atomic as ordering of inc/dec matters.
224XrdSys::RAtomic<std::chrono::steady_clock::duration::rep> m_io_active_time; // Total IO wait time recorded since the last recompute interval; reset to zero about every second.
225XrdSys::RAtomic<uint64_t> m_io_total{0}; // Monotonically increasing count of IO operations; reset to zero about every second.
226
227int m_stable_io_active{0}; // Number of IO operations in progress as of the last recompute interval; must hold m_compute_var lock when reading/writing.
228uint64_t m_stable_io_total{0}; // Total IO operations since startup. Recomputed every second; must hold m_compute_var lock when reading/writing.
229
230std::chrono::steady_clock::duration m_stable_io_wait; // Total IO wait time as of the last recompute interval.
231
232// Load shed details
233std::string m_loadshed_host;
234unsigned m_loadshed_port;
235unsigned m_loadshed_frequency;
236
237// The number of times we have an I/O operation that hit the concurrency limit.
238// This is monotonically increasing and is "relaxed" because it's purely advisory;
239// ordering of the increments between threads is not important.
240XrdSys::RAtomic<int> m_loadshed_limit_hit;
241
242// Maximum number of open files
243unsigned long m_max_open{0};
244unsigned long m_max_conns{0};
245std::unordered_map<std::string, unsigned long> m_file_counters;
246std::unordered_map<std::string, unsigned long> m_conn_counters;
247std::unordered_map<std::string, std::unique_ptr<std::unordered_map<pid_t, unsigned long>>> m_active_conns;
248std::mutex m_file_mutex;
249
250// Track the ongoing I/O operations. We have several linked lists (hashed on the
251// CPU ID) of I/O operations that are in progress. This way, we can periodically sum
252// up the time spent in ongoing operations - which is important for operations that
253// last longer than the recompute interval.
254struct TimerList {
255 std::mutex m_mutex;
256 XrdThrottleTimer *m_first{nullptr};
257 XrdThrottleTimer *m_last{nullptr};
258};
259#if defined(__linux__)
260static constexpr size_t m_timer_list_size = 32;
261#else
262static constexpr size_t m_timer_list_size = 1;
263#endif
264std::array<TimerList, m_timer_list_size> m_timer_list; // A vector of linked lists of I/O operations. We keep track of multiple instead of a single one to avoid a global mutex.
265
266// Maximum wait time for a user to perform an I/O operation before failing.
267// Most clients have some sort of operation timeout; after that point, if we go
268// ahead and do the work, it's wasted effort as the client has gone.
269std::chrono::steady_clock::duration m_max_wait_time{std::chrono::seconds(30)};
270
271// Monitoring handle, if configured
272XrdXrootdGStream* m_gstream{nullptr};
273
274static const char *TraceID;
275
276};
277
279{
280
282
283public:
284
286{
287 if (m_manager) {
288 StopTimer();
289 }
290}
291
292protected:
293
295 m_start_time(std::chrono::steady_clock::time_point::min())
296{}
297
299 m_owner(uid),
300 m_timer_list_entry(XrdThrottleManager::GetTimerListHash()),
301 m_manager(manager),
302 m_start_time(std::chrono::steady_clock::now())
303{
304 if (!m_manager) {
305 return;
306 }
307 auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
308 std::lock_guard<std::mutex> lock(timerList.m_mutex);
309 if (timerList.m_first == nullptr) {
310 timerList.m_first = this;
311 } else {
312 m_prev = timerList.m_last;
313 m_prev->m_next = this;
314 }
315 timerList.m_last = this;
316}
317
318std::chrono::steady_clock::duration Reset() {
319 auto now = std::chrono::steady_clock::now();
320 auto last_start = m_start_time.exchange(now);
321 return now - last_start;
322}
323
324private:
325
326 void StopTimer()
327 {
328 if (!m_manager) return;
329
330 auto event_duration = Reset();
331 auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
332 {
333 std::unique_lock<std::mutex> lock(timerList.m_mutex);
334 if (m_prev) {
335 m_prev->m_next = m_next;
336 if (m_next) {
337 m_next->m_prev = m_prev;
338 } else {
339 timerList.m_last = m_prev;
340 }
341 } else {
342 timerList.m_first = m_next;
343 if (m_next) {
344 m_next->m_prev = nullptr;
345 } else {
346 timerList.m_last = nullptr;
347 }
348 }
349 }
350 m_manager->StopIOTimer(event_duration, m_owner);
351 }
352
353 const uint16_t m_owner{0};
354 const uint16_t m_timer_list_entry{0};
355 XrdThrottleManager *m_manager{nullptr};
356 XrdThrottleTimer *m_prev{nullptr};
357 XrdThrottleTimer *m_next{nullptr};
358 XrdSys::RAtomic<std::chrono::steady_clock::time_point> m_start_time;
359
360};
361
362#endif
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void SetMonitor(XrdXrootdGStream *gstream)
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
XrdThrottleTimer(XrdThrottleManager *manager, int uid)
friend class XrdThrottleManager
std::chrono::steady_clock::duration Reset()