1 |
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 |
/* |
3 |
* Thread management for memcached. |
4 |
*/ |
5 |
#include "memcached.h" |
6 |
#include <assert.h> |
7 |
#include <stdio.h> |
8 |
#include <errno.h> |
9 |
#include <stdlib.h> |
10 |
#include <errno.h> |
11 |
#include <string.h> |
12 |
#include <pthread.h> |
13 |
|
14 |
#define ITEMS_PER_ALLOC 64 |
15 |
|
16 |
/* An item in the connection queue. */ |
17 |
typedef struct conn_queue_item CQ_ITEM; |
18 |
struct conn_queue_item { |
19 |
int sfd; |
20 |
enum conn_states init_state; |
21 |
int event_flags; |
22 |
int read_buffer_size; |
23 |
enum network_transport transport; |
24 |
CQ_ITEM *next; |
25 |
}; |
26 |
|
27 |
/* A connection queue. */ |
28 |
typedef struct conn_queue CQ; |
29 |
struct conn_queue { |
30 |
CQ_ITEM *head; |
31 |
CQ_ITEM *tail; |
32 |
pthread_mutex_t lock; |
33 |
pthread_cond_t cond; |
34 |
}; |
35 |
|
36 |
/* Lock for cache operations (item_*, assoc_*) */ |
37 |
pthread_mutex_t cache_lock; |
38 |
|
39 |
/* Connection lock around accepting new connections */ |
40 |
pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER; |
41 |
|
42 |
/* Lock for global stats */ |
43 |
static pthread_mutex_t stats_lock; |
44 |
|
45 |
/* Free list of CQ_ITEM structs */ |
46 |
static CQ_ITEM *cqi_freelist; |
47 |
static pthread_mutex_t cqi_freelist_lock; |
48 |
|
49 |
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread; |
50 |
|
51 |
/* |
52 |
* Each libevent instance has a wakeup pipe, which other threads |
53 |
* can use to signal that they've put a new connection on its queue. |
54 |
*/ |
55 |
static LIBEVENT_THREAD *threads; |
56 |
|
57 |
/* |
58 |
* Number of worker threads that have finished setting themselves up. |
59 |
*/ |
60 |
static int init_count = 0; |
61 |
static pthread_mutex_t init_lock; |
62 |
static pthread_cond_t init_cond; |
63 |
|
64 |
|
65 |
static void thread_libevent_process(int fd, short which, void *arg); |
66 |
|
67 |
/* |
68 |
* Initializes a connection queue. |
69 |
*/ |
70 |
static void cq_init(CQ *cq) { |
71 |
pthread_mutex_init(&cq->lock, NULL); |
72 |
pthread_cond_init(&cq->cond, NULL); |
73 |
cq->head = NULL; |
74 |
cq->tail = NULL; |
75 |
} |
76 |
|
77 |
/* |
78 |
* Looks for an item on a connection queue, but doesn't block if there isn't |
79 |
* one. |
80 |
* Returns the item, or NULL if no item is available |
81 |
*/ |
82 |
static CQ_ITEM *cq_pop(CQ *cq) { |
83 |
CQ_ITEM *item; |
84 |
|
85 |
pthread_mutex_lock(&cq->lock); |
86 |
item = cq->head; |
87 |
if (NULL != item) { |
88 |
cq->head = item->next; |
89 |
if (NULL == cq->head) |
90 |
cq->tail = NULL; |
91 |
} |
92 |
pthread_mutex_unlock(&cq->lock); |
93 |
|
94 |
return item; |
95 |
} |
96 |
|
97 |
/* |
98 |
* Adds an item to a connection queue. |
99 |
*/ |
100 |
static void cq_push(CQ *cq, CQ_ITEM *item) { |
101 |
item->next = NULL; |
102 |
|
103 |
pthread_mutex_lock(&cq->lock); |
104 |
if (NULL == cq->tail) |
105 |
cq->head = item; |
106 |
else |
107 |
cq->tail->next = item; |
108 |
cq->tail = item; |
109 |
pthread_cond_signal(&cq->cond); |
110 |
pthread_mutex_unlock(&cq->lock); |
111 |
} |
112 |
|
113 |
/* |
114 |
* Returns a fresh connection queue item. |
115 |
*/ |
116 |
static CQ_ITEM *cqi_new(void) { |
117 |
CQ_ITEM *item = NULL; |
118 |
pthread_mutex_lock(&cqi_freelist_lock); |
119 |
if (cqi_freelist) { |
120 |
item = cqi_freelist; |
121 |
cqi_freelist = item->next; |
122 |
} |
123 |
pthread_mutex_unlock(&cqi_freelist_lock); |
124 |
|
125 |
if (NULL == item) { |
126 |
int i; |
127 |
|
128 |
/* Allocate a bunch of items at once to reduce fragmentation */ |
129 |
item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC); |
130 |
if (NULL == item) |
131 |
return NULL; |
132 |
|
133 |
/* |
134 |
* Link together all the new items except the first one |
135 |
* (which we'll return to the caller) for placement on |
136 |
* the freelist. |
137 |
*/ |
138 |
for (i = 2; i < ITEMS_PER_ALLOC; i++) |
139 |
item[i - 1].next = &item[i]; |
140 |
|
141 |
pthread_mutex_lock(&cqi_freelist_lock); |
142 |
item[ITEMS_PER_ALLOC - 1].next = cqi_freelist; |
143 |
cqi_freelist = &item[1]; |
144 |
pthread_mutex_unlock(&cqi_freelist_lock); |
145 |
} |
146 |
|
147 |
return item; |
148 |
} |
149 |
|
150 |
|
151 |
/* |
152 |
* Frees a connection queue item (adds it to the freelist.) |
153 |
*/ |
154 |
static void cqi_free(CQ_ITEM *item) { |
155 |
pthread_mutex_lock(&cqi_freelist_lock); |
156 |
item->next = cqi_freelist; |
157 |
cqi_freelist = item; |
158 |
pthread_mutex_unlock(&cqi_freelist_lock); |
159 |
} |
160 |
|
161 |
|
162 |
/* |
163 |
* Creates a worker thread. |
164 |
*/ |
165 |
static void create_worker(void *(*func)(void *), void *arg) { |
166 |
pthread_t thread; |
167 |
pthread_attr_t attr; |
168 |
int ret; |
169 |
|
170 |
pthread_attr_init(&attr); |
171 |
|
172 |
if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) { |
173 |
fprintf(stderr, "Can't create thread: %s\n", |
174 |
strerror(ret)); |
175 |
exit(1); |
176 |
} |
177 |
} |
178 |
|
179 |
/* |
180 |
* Sets whether or not we accept new connections. |
181 |
*/ |
182 |
void accept_new_conns(const bool do_accept) { |
183 |
pthread_mutex_lock(&conn_lock); |
184 |
do_accept_new_conns(do_accept); |
185 |
pthread_mutex_unlock(&conn_lock); |
186 |
} |
187 |
/****************************** LIBEVENT THREADS *****************************/ |
188 |
|
189 |
/* |
190 |
* Set up a thread's information. |
191 |
*/ |
192 |
static void setup_thread(LIBEVENT_THREAD *me) { |
193 |
me->base = event_init(); |
194 |
if (! me->base) { |
195 |
fprintf(stderr, "Can't allocate event base\n"); |
196 |
exit(1); |
197 |
} |
198 |
|
199 |
/* Listen for notifications from other threads */ |
200 |
event_set(&me->notify_event, me->notify_receive_fd, |
201 |
EV_READ | EV_PERSIST, thread_libevent_process, me); |
202 |
event_base_set(me->base, &me->notify_event); |
203 |
|
204 |
if (event_add(&me->notify_event, 0) == -1) { |
205 |
fprintf(stderr, "Can't monitor libevent notify pipe\n"); |
206 |
exit(1); |
207 |
} |
208 |
|
209 |
me->new_conn_queue = malloc(sizeof(struct conn_queue)); |
210 |
if (me->new_conn_queue == NULL) { |
211 |
perror("Failed to allocate memory for connection queue"); |
212 |
exit(EXIT_FAILURE); |
213 |
} |
214 |
cq_init(me->new_conn_queue); |
215 |
|
216 |
if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) { |
217 |
perror("Failed to initialize mutex"); |
218 |
exit(EXIT_FAILURE); |
219 |
} |
220 |
|
221 |
me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*), |
222 |
NULL, NULL); |
223 |
if (me->suffix_cache == NULL) { |
224 |
fprintf(stderr, "Failed to create suffix cache\n"); |
225 |
exit(EXIT_FAILURE); |
226 |
} |
227 |
} |
228 |
|
229 |
|
230 |
/* |
231 |
* Worker thread: main event loop |
232 |
*/ |
233 |
static void *worker_libevent(void *arg) { |
234 |
LIBEVENT_THREAD *me = arg; |
235 |
|
236 |
/* Any per-thread setup can happen here; thread_init() will block until |
237 |
* all threads have finished initializing. |
238 |
*/ |
239 |
|
240 |
pthread_mutex_lock(&init_lock); |
241 |
init_count++; |
242 |
pthread_cond_signal(&init_cond); |
243 |
pthread_mutex_unlock(&init_lock); |
244 |
|
245 |
event_base_loop(me->base, 0); |
246 |
return NULL; |
247 |
} |
248 |
|
249 |
|
250 |
/* |
251 |
* Processes an incoming "handle a new connection" item. This is called when |
252 |
* input arrives on the libevent wakeup pipe. |
253 |
*/ |
254 |
static void thread_libevent_process(int fd, short which, void *arg) { |
255 |
LIBEVENT_THREAD *me = arg; |
256 |
CQ_ITEM *item; |
257 |
char buf[1]; |
258 |
|
259 |
if (read(fd, buf, 1) != 1) |
260 |
if (settings.verbose > 0) |
261 |
fprintf(stderr, "Can't read from libevent pipe\n"); |
262 |
|
263 |
item = cq_pop(me->new_conn_queue); |
264 |
|
265 |
if (NULL != item) { |
266 |
conn *c = conn_new(item->sfd, item->init_state, item->event_flags, |
267 |
item->read_buffer_size, item->transport, me->base); |
268 |
if (c == NULL) { |
269 |
if (IS_UDP(item->transport)) { |
270 |
fprintf(stderr, "Can't listen for events on UDP socket\n"); |
271 |
exit(1); |
272 |
} else { |
273 |
if (settings.verbose > 0) { |
274 |
fprintf(stderr, "Can't listen for events on fd %d\n", |
275 |
item->sfd); |
276 |
} |
277 |
close(item->sfd); |
278 |
} |
279 |
} else { |
280 |
c->thread = me; |
281 |
} |
282 |
cqi_free(item); |
283 |
} |
284 |
} |
285 |
|
286 |
/* Which thread we assigned a connection to most recently. */ |
287 |
static int last_thread = -1; |
288 |
|
289 |
/* |
290 |
* Dispatches a new connection to another thread. This is only ever called |
291 |
* from the main thread, either during initialization (for UDP) or because |
292 |
* of an incoming connection. |
293 |
*/ |
294 |
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, |
295 |
int read_buffer_size, enum network_transport transport) { |
296 |
CQ_ITEM *item = cqi_new(); |
297 |
int tid = (last_thread + 1) % settings.num_threads; |
298 |
|
299 |
LIBEVENT_THREAD *thread = threads + tid; |
300 |
|
301 |
last_thread = tid; |
302 |
|
303 |
item->sfd = sfd; |
304 |
item->init_state = init_state; |
305 |
item->event_flags = event_flags; |
306 |
item->read_buffer_size = read_buffer_size; |
307 |
item->transport = transport; |
308 |
|
309 |
cq_push(thread->new_conn_queue, item); |
310 |
|
311 |
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); |
312 |
if (write(thread->notify_send_fd, "", 1) != 1) { |
313 |
perror("Writing to thread notify pipe"); |
314 |
} |
315 |
} |
316 |
|
317 |
/* |
318 |
* Returns true if this is the thread that listens for new TCP connections. |
319 |
*/ |
320 |
int is_listen_thread() { |
321 |
return pthread_self() == dispatcher_thread.thread_id; |
322 |
} |
323 |
|
324 |
/********************************* ITEM ACCESS *******************************/ |
325 |
|
326 |
/* |
327 |
* Allocates a new item. |
328 |
*/ |
329 |
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) { |
330 |
item *it; |
331 |
pthread_mutex_lock(&cache_lock); |
332 |
it = do_item_alloc(key, nkey, flags, exptime, nbytes); |
333 |
pthread_mutex_unlock(&cache_lock); |
334 |
return it; |
335 |
} |
336 |
|
337 |
/* |
338 |
* Returns an item if it hasn't been marked as expired, |
339 |
* lazy-expiring as needed. |
340 |
*/ |
341 |
item *item_get(const char *key, const size_t nkey) { |
342 |
item *it; |
343 |
pthread_mutex_lock(&cache_lock); |
344 |
it = do_item_get(key, nkey); |
345 |
pthread_mutex_unlock(&cache_lock); |
346 |
return it; |
347 |
} |
348 |
|
349 |
/* |
350 |
* Links an item into the LRU and hashtable. |
351 |
*/ |
352 |
int item_link(item *item) { |
353 |
int ret; |
354 |
|
355 |
pthread_mutex_lock(&cache_lock); |
356 |
ret = do_item_link(item); |
357 |
pthread_mutex_unlock(&cache_lock); |
358 |
return ret; |
359 |
} |
360 |
|
361 |
/* |
362 |
* Decrements the reference count on an item and adds it to the freelist if |
363 |
* needed. |
364 |
*/ |
365 |
void item_remove(item *item) { |
366 |
pthread_mutex_lock(&cache_lock); |
367 |
do_item_remove(item); |
368 |
pthread_mutex_unlock(&cache_lock); |
369 |
} |
370 |
|
371 |
/* |
372 |
* Replaces one item with another in the hashtable. |
373 |
* Unprotected by a mutex lock since the core server does not require |
374 |
* it to be thread-safe. |
375 |
*/ |
376 |
int item_replace(item *old_it, item *new_it) { |
377 |
return do_item_replace(old_it, new_it); |
378 |
} |
379 |
|
380 |
/* |
381 |
* Unlinks an item from the LRU and hashtable. |
382 |
*/ |
383 |
void item_unlink(item *item) { |
384 |
pthread_mutex_lock(&cache_lock); |
385 |
do_item_unlink(item); |
386 |
pthread_mutex_unlock(&cache_lock); |
387 |
} |
388 |
|
389 |
/* |
390 |
* Moves an item to the back of the LRU queue. |
391 |
*/ |
392 |
void item_update(item *item) { |
393 |
pthread_mutex_lock(&cache_lock); |
394 |
do_item_update(item); |
395 |
pthread_mutex_unlock(&cache_lock); |
396 |
} |
397 |
|
398 |
/* |
399 |
* Does arithmetic on a numeric item value. |
400 |
*/ |
401 |
enum delta_result_type add_delta(conn *c, item *item, int incr, |
402 |
const int64_t delta, char *buf) { |
403 |
enum delta_result_type ret; |
404 |
|
405 |
pthread_mutex_lock(&cache_lock); |
406 |
ret = do_add_delta(c, item, incr, delta, buf); |
407 |
pthread_mutex_unlock(&cache_lock); |
408 |
return ret; |
409 |
} |
410 |
|
411 |
/* |
412 |
* Stores an item in the cache (high level, obeys set/add/replace semantics) |
413 |
*/ |
414 |
enum store_item_type store_item(item *item, int comm, conn* c) { |
415 |
enum store_item_type ret; |
416 |
|
417 |
pthread_mutex_lock(&cache_lock); |
418 |
ret = do_store_item(item, comm, c); |
419 |
pthread_mutex_unlock(&cache_lock); |
420 |
return ret; |
421 |
} |
422 |
|
423 |
/* |
424 |
* Flushes expired items after a flush_all call |
425 |
*/ |
426 |
void item_flush_expired() { |
427 |
pthread_mutex_lock(&cache_lock); |
428 |
do_item_flush_expired(); |
429 |
pthread_mutex_unlock(&cache_lock); |
430 |
} |
431 |
|
432 |
/* |
433 |
* Dumps part of the cache |
434 |
*/ |
435 |
char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) { |
436 |
char *ret; |
437 |
|
438 |
pthread_mutex_lock(&cache_lock); |
439 |
ret = do_item_cachedump(slabs_clsid, limit, bytes); |
440 |
pthread_mutex_unlock(&cache_lock); |
441 |
return ret; |
442 |
} |
443 |
|
444 |
/* |
445 |
* Dumps statistics about slab classes |
446 |
*/ |
447 |
void item_stats(ADD_STAT add_stats, void *c) { |
448 |
pthread_mutex_lock(&cache_lock); |
449 |
do_item_stats(add_stats, c); |
450 |
pthread_mutex_unlock(&cache_lock); |
451 |
} |
452 |
|
453 |
/* |
454 |
* Dumps a list of objects of each size in 32-byte increments |
455 |
*/ |
456 |
void item_stats_sizes(ADD_STAT add_stats, void *c) { |
457 |
pthread_mutex_lock(&cache_lock); |
458 |
do_item_stats_sizes(add_stats, c); |
459 |
pthread_mutex_unlock(&cache_lock); |
460 |
} |
461 |
|
462 |
/******************************* GLOBAL STATS ******************************/ |
463 |
|
464 |
void STATS_LOCK() { |
465 |
pthread_mutex_lock(&stats_lock); |
466 |
} |
467 |
|
468 |
void STATS_UNLOCK() { |
469 |
pthread_mutex_unlock(&stats_lock); |
470 |
} |
471 |
|
472 |
void threadlocal_stats_reset(void) { |
473 |
int ii, sid; |
474 |
for (ii = 0; ii < settings.num_threads; ++ii) { |
475 |
pthread_mutex_lock(&threads[ii].stats.mutex); |
476 |
|
477 |
threads[ii].stats.get_cmds = 0; |
478 |
threads[ii].stats.get_misses = 0; |
479 |
threads[ii].stats.delete_misses = 0; |
480 |
threads[ii].stats.incr_misses = 0; |
481 |
threads[ii].stats.decr_misses = 0; |
482 |
threads[ii].stats.cas_misses = 0; |
483 |
threads[ii].stats.bytes_read = 0; |
484 |
threads[ii].stats.bytes_written = 0; |
485 |
threads[ii].stats.flush_cmds = 0; |
486 |
threads[ii].stats.conn_yields = 0; |
487 |
threads[ii].stats.auth_cmds = 0; |
488 |
threads[ii].stats.auth_errors = 0; |
489 |
|
490 |
for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { |
491 |
threads[ii].stats.slab_stats[sid].set_cmds = 0; |
492 |
threads[ii].stats.slab_stats[sid].get_hits = 0; |
493 |
threads[ii].stats.slab_stats[sid].delete_hits = 0; |
494 |
threads[ii].stats.slab_stats[sid].incr_hits = 0; |
495 |
threads[ii].stats.slab_stats[sid].decr_hits = 0; |
496 |
threads[ii].stats.slab_stats[sid].cas_hits = 0; |
497 |
threads[ii].stats.slab_stats[sid].cas_badval = 0; |
498 |
} |
499 |
|
500 |
pthread_mutex_unlock(&threads[ii].stats.mutex); |
501 |
} |
502 |
} |
503 |
|
504 |
void threadlocal_stats_aggregate(struct thread_stats *stats) { |
505 |
int ii, sid; |
506 |
/* The struct contains a mutex, so I should probably not memset it.. */ |
507 |
stats->get_cmds = 0; |
508 |
stats->get_misses = 0; |
509 |
stats->delete_misses = 0; |
510 |
stats->incr_misses = 0; |
511 |
stats->decr_misses = 0; |
512 |
stats->cas_misses = 0; |
513 |
stats->bytes_written = 0; |
514 |
stats->bytes_read = 0; |
515 |
stats->flush_cmds = 0; |
516 |
stats->conn_yields = 0; |
517 |
stats->auth_cmds = 0; |
518 |
stats->auth_errors = 0; |
519 |
|
520 |
memset(stats->slab_stats, 0, |
521 |
sizeof(struct slab_stats) * MAX_NUMBER_OF_SLAB_CLASSES); |
522 |
|
523 |
for (ii = 0; ii < settings.num_threads; ++ii) { |
524 |
pthread_mutex_lock(&threads[ii].stats.mutex); |
525 |
|
526 |
stats->get_cmds += threads[ii].stats.get_cmds; |
527 |
stats->get_misses += threads[ii].stats.get_misses; |
528 |
stats->delete_misses += threads[ii].stats.delete_misses; |
529 |
stats->decr_misses += threads[ii].stats.decr_misses; |
530 |
stats->incr_misses += threads[ii].stats.incr_misses; |
531 |
stats->cas_misses += threads[ii].stats.cas_misses; |
532 |
stats->bytes_read += threads[ii].stats.bytes_read; |
533 |
stats->bytes_written += threads[ii].stats.bytes_written; |
534 |
stats->flush_cmds += threads[ii].stats.flush_cmds; |
535 |
stats->conn_yields += threads[ii].stats.conn_yields; |
536 |
stats->auth_cmds += threads[ii].stats.auth_cmds; |
537 |
stats->auth_errors += threads[ii].stats.auth_errors; |
538 |
|
539 |
for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { |
540 |
stats->slab_stats[sid].set_cmds += |
541 |
threads[ii].stats.slab_stats[sid].set_cmds; |
542 |
stats->slab_stats[sid].get_hits += |
543 |
threads[ii].stats.slab_stats[sid].get_hits; |
544 |
stats->slab_stats[sid].delete_hits += |
545 |
threads[ii].stats.slab_stats[sid].delete_hits; |
546 |
stats->slab_stats[sid].decr_hits += |
547 |
threads[ii].stats.slab_stats[sid].decr_hits; |
548 |
stats->slab_stats[sid].incr_hits += |
549 |
threads[ii].stats.slab_stats[sid].incr_hits; |
550 |
stats->slab_stats[sid].cas_hits += |
551 |
threads[ii].stats.slab_stats[sid].cas_hits; |
552 |
stats->slab_stats[sid].cas_badval += |
553 |
threads[ii].stats.slab_stats[sid].cas_badval; |
554 |
} |
555 |
|
556 |
pthread_mutex_unlock(&threads[ii].stats.mutex); |
557 |
} |
558 |
} |
559 |
|
560 |
void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) { |
561 |
int sid; |
562 |
|
563 |
out->set_cmds = 0; |
564 |
out->get_hits = 0; |
565 |
out->delete_hits = 0; |
566 |
out->incr_hits = 0; |
567 |
out->decr_hits = 0; |
568 |
out->cas_hits = 0; |
569 |
out->cas_badval = 0; |
570 |
|
571 |
for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { |
572 |
out->set_cmds += stats->slab_stats[sid].set_cmds; |
573 |
out->get_hits += stats->slab_stats[sid].get_hits; |
574 |
out->delete_hits += stats->slab_stats[sid].delete_hits; |
575 |
out->decr_hits += stats->slab_stats[sid].decr_hits; |
576 |
out->incr_hits += stats->slab_stats[sid].incr_hits; |
577 |
out->cas_hits += stats->slab_stats[sid].cas_hits; |
578 |
out->cas_badval += stats->slab_stats[sid].cas_badval; |
579 |
} |
580 |
} |
581 |
|
582 |
/* |
583 |
* Initializes the thread subsystem, creating various worker threads. |
584 |
* |
585 |
* nthreads Number of worker event handler threads to spawn |
586 |
* main_base Event base for main thread |
587 |
*/ |
588 |
void thread_init(int nthreads, struct event_base *main_base) { |
589 |
int i; |
590 |
|
591 |
pthread_mutex_init(&cache_lock, NULL); |
592 |
pthread_mutex_init(&stats_lock, NULL); |
593 |
|
594 |
pthread_mutex_init(&init_lock, NULL); |
595 |
pthread_cond_init(&init_cond, NULL); |
596 |
|
597 |
pthread_mutex_init(&cqi_freelist_lock, NULL); |
598 |
cqi_freelist = NULL; |
599 |
|
600 |
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); |
601 |
if (! threads) { |
602 |
perror("Can't allocate thread descriptors"); |
603 |
exit(1); |
604 |
} |
605 |
|
606 |
dispatcher_thread.base = main_base; |
607 |
dispatcher_thread.thread_id = pthread_self(); |
608 |
|
609 |
for (i = 0; i < nthreads; i++) { |
610 |
int fds[2]; |
611 |
if (pipe(fds)) { |
612 |
perror("Can't create notify pipe"); |
613 |
exit(1); |
614 |
} |
615 |
|
616 |
threads[i].notify_receive_fd = fds[0]; |
617 |
threads[i].notify_send_fd = fds[1]; |
618 |
|
619 |
setup_thread(&threads[i]); |
620 |
} |
621 |
|
622 |
/* Create threads after we've done all the libevent setup. */ |
623 |
for (i = 0; i < nthreads; i++) { |
624 |
create_worker(worker_libevent, &threads[i]); |
625 |
} |
626 |
|
627 |
/* Wait for all the threads to set themselves up before returning. */ |
628 |
pthread_mutex_lock(&init_lock); |
629 |
while (init_count < nthreads) { |
630 |
pthread_cond_wait(&init_cond, &init_lock); |
631 |
} |
632 |
pthread_mutex_unlock(&init_lock); |
633 |
} |
634 |
|