smq.c
1 /*
2  shm-arena shared memory arena
3  Copyright (C) 2006-2008 Lance Arsenault (LGPL v3)
4 
5 
6  This file is part of shm-arena.
7 
8  shm-arena is free software; you can redistribute it and/or modify
9  it under the terms of the GNU Lesser General Public License as
10  published by the Free Software Foundation; either version 3 of the
11  License, or (at your option) any later version.
12 
13  shm-arena is distributed in the hope that it will be useful, but
14  WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  Lesser General Public License for more details.
17 
18  You should have received a copy of the GNU Lesser General Public
19  License along with this program. If not, see
20  <http://www.gnu.org/licenses/>.
21 */
22 
23 #include "config.h"
24 #include <stdio.h>
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <fcntl.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <sys/time.h>
34 #include <limits.h>
35 #include <values.h>
36 #include <pthread.h>
37 
38 #include "spew.h"
39 #include "assert.h"
40 #include "debug_lock.h"
41 #include "shm_arena.h"
42 #include "arena.h"
43 #include "arena_lock.h"
44 #include "avl.h"
45 
46 
47 #define MAGIC (0xC30F53AD)
48 
49 /* struct smq_header flags */
50 #define TIMESTAMPED (01 << 0) /* entry is time-stamped */
51 
52 /* A time stamp values that shows that the time stamp is
53  * uninitialized. */
54 #define UNTIMESTAMPED (-(LDBL_MAX))
55 
56 
57 typedef int16_t q_length_t;
58 
59 /* 2^15-2 is max q length = 32766 */
60 #define MAX_Q_LENGTH (32766)
61 
62 struct smq_header
63 {
64  /* for signaling blocking reads */
65  pthread_cond_t cond;
66  /* for blocking reads */
67  pthread_mutex_t mutex;
68 
69  uint32_t magic;
70  /* number of entries that we can read at most this can increase, but
71  * not decrease */
72  q_length_t q_length;
73  /* the index to the entry that will be written to next starting at
74  * 0. */
75  q_length_t write_index;
76  /* wrap_index is so that a new reading object can read all entries
77  * that have been written. Without something like wrap_index is no
78  * way to know how much data can be read from the queue since
79  * write_index does not tell you if the queue has wrapped around yet
80  * or not. wrap_index increases from 0 until it is q_length + 1 and
81  * then it never changes unless the queue gets longer. Any new
82  * reader will see if the circular queue is full or if not how much
83  * there is to read. */
84  q_length_t wrap_index;
85  /* element_size is the entry element size. */
86  size_t element_size;
87 
88  char entries_seg_name[GEN_NAME_SIZE];
89 
90  /* Gets set if there is a blocking read happening in some process.
91  * You must get the mutex lock from this struct to read or write to
92  * this. */
93  int is_blocking_read;
94 
95  /* flags keeps a record of how the Shared Multi-Queue is
96  * configured. flags bits: TIMESTAMPED */
97  uint8_t flags;
98 };
99 
100 struct smq
101 {
102  shm_arena_t arena;
103  /* pointer to struct smq_header Shared memory segment */
104  struct smq_header *header;
105  /* timestamp is set to the current write_index where the where the
106  * timestamp can go. If it is NULL the timestamp does not need to
107  * be set. */
108  long double *timestamp;
109  /* entry index of the next read */
110  q_length_t read_index;
111  /* We need a local copy of the q_length so that we can fix the
112  * read_index and reconnect to the entries shared memory segment if
113  * the q_length in shared memory changes. */
114  q_length_t q_length;
115 
116  /* Used to protect reading and writing of thread_count. */
117  pthread_mutex_t thread_count_mutex;
118  /* Used to sync the smq_delete() with smq_unblock_rdlock()
119  * threads. So that calls to smq_unblock_rdlock() will not signal
120  * when threads at done running, and we don't deadlock the shared
121  * memory multi-queue. */
122  pthread_cond_t thread_count_cond;
123  /* The number of threads running that are trying to unblock the a
124  * blocking smq_rdlock() call. There should be one in most use
125  * cases, but we do not restrict it, so there can be any number. */
126  int thread_count;
127 
128  /* This is non-zero if the next smq_rdlock() call is required to not
129  * block as set by smq_unblock_rdlock(). We lock the smq_header
130  * mutex to read or write to unblock_rdlock. */
131  int unblock_rdlock;
132 
133  /* pointer to entries shared memory segment */
134  uint8_t *entries_seg;
135  /* flags keeps a record of how the shared multi-queue is configured.
136  * This must be kept in sync with the flags in struct smq_header in
137  * shared memory. */
138  uint8_t flags;
139 };
140 
141 /********************************************************************
142  Multi-Queue Shared Memory Segments Structure
143 *********************************************************************
144 
145  It uses two segments. The smq_header segment never moves, but
146  the entries segment may move if the multi-queue is re-sized making
147  the queue longer. It's very important for the multi-queue to be
148  able to increase the q_length as a slower reader program needs to
149  read.
150 
151 
152  Header Segment
153 
154  ----------------------------
155  | struct smq_header |
156  ----------------------------
157 
158 
159  Entries Segment
160 
161  ---------------------------- 0
162  | entry element_size |
163  | padding |
164  | long double | if(flags&TIMESTAMPED)
165  | padding |
166  |--------------------------| 1
167  | entry element_size |
168  | padding |
169  | long double | if(flags&TIMESTAMPED)
170  | padding |
171  |--------------------------| 2
172  | entry element_size |
173  | padding |
174  | long double | if(flags&TIMESTAMPED)
175  | padding |
176  |--------------------------|
177  | . |
178  .
179  .
180  .
181  | . |
182  |--------------------------| q_length
183  | entry element_size |
184  | padding |
185  | long double | if(flags&TIMESTAMPED)
186  | padding |
187  ----------------------------
188 
189 
190  Note that there is q_length + 1 entries. That's because this is a
191  circular queue which requires there is one more than the number
192  that can be filled, so that it knows when it is wrapped and full.
193  The padding puts the data boundary at 8 byte word, using CHUNK
194  from arena.h. The long double and its padding is present if
195  flags has the TIMESTAMPED bit set, else it's not present.
196 
197 
198 **********************************************************************/
199 
200 
201 static inline
202 int connect(struct smq *q)
203 /* We have q->header, and connect to the multi-queue. The reason that
204  * we reconnect is when the q_length is increased or flags changes. */
205 {
206  size_t entry_size;
207 
208  if(q->q_length < 0)
209  {
210  /* This is a new object connection. Make is so that we can read
211  * as much of the entries as possible */
212  if(q->header->wrap_index > q->header->q_length)
213  {
214  /* The queue is full and wrapping */
215  ASSERT(q->header->wrap_index == q->header->q_length + 1);
216 
217  q->read_index = q->header->write_index + 1;
218  if(q->read_index > q->header->q_length)
219  {
220  ASSERT(q->read_index == q->header->q_length + 1);
221  q->read_index = 0;
222  }
223  }
224  else
225  {
226  q->read_index = q->header->write_index - q->header->wrap_index;
227  if(q->read_index < 0)
228  q->read_index += q->header->q_length + 1;
229  }
230  }
231  else if(q->q_length != q->header->q_length)
232  {
233  ASSERT(q->q_length < q->header->q_length);
234 
235  /* this is a reconnect to a longer queue */
236  if(q->read_index > q->header->write_index)
237  q->read_index += q->header->q_length - q->q_length;
238  }
239 
240  entry_size = CHUNKS(q->header->element_size) +
241  ((q->header->flags & TIMESTAMPED)?CHUNKS(sizeof(long double)):0);
242 
243  q->entries_seg = (uint8_t *)
244  shm_get(q->arena, (q->header->q_length + 1)*entry_size,
245  q->header->entries_seg_name, SHM_WITHOUT_RWLOCK);
246 
247  if(!q->entries_seg)
248  return -1; /* error */
249 
250  if((q->flags & TIMESTAMPED) &&
251  (q->header->flags & TIMESTAMPED) && q->q_length < 0)
252  {
253  /* If this a new connection looking for time stamped data we
254  * must make sure that any entries in the queue are time
255  * stamped, if not advance the read_index until it points to
256  * time stamped data or to the current write_index. Un-time
257  * stamped data has value UNTIMESTAMPED */
258  while(( ( *((long double *)
259  (q->entries_seg +
260  (q->read_index*entry_size +
261  CHUNKS(q->header->element_size)))))
262  == UNTIMESTAMPED) &&
263  q->read_index != q->header->write_index)
264  {
265  /* Advance the read_index until read_index == write_index or
266  * we have a good time stamp (tv_usec != -1). */
267  q->read_index++;
268  if(q->read_index > q->header->q_length)
269  q->read_index = 0;
270  }
271  }
272 
273 
274  SPEW(_INFO,
275  "%sonnected to Shared multi-queue "
276  "with queue length %d with entry segment \"%s\" "
277  "%s time stamping",
278  (q->q_length < 0)?"C":"Rec", q->header->q_length,
279  q->header->entries_seg_name,
280  ((q->header->flags & TIMESTAMPED)?"with":"without"));
281 
282  if(q->q_length >= 0)
283  /* If this is the first connection (not a reconnection) to the
284  * Shared Multi-Queue don't change the q->flags so that we can add
285  * time stamping in smq_get() if it is set in the flags passed to
286  * smq_get().*/
287  q->flags = q->header->flags;
288 
289  q->q_length = q->header->q_length;
290 
291  return 0;
292 }
293 
358 smq_t smq_get(shm_arena_t arena, size_t element_size,
359  int q_length, const char *name, int flags)
360 {
361  struct smq *q = NULL;
362  int errno_save;
363 
364  SPEW(_INFO, "%s(arena=%p, element_size=%zu, queue_length=%d,"
365  "name=\"%s\", flags=%d )", __func__, arena, element_size,
366  q_length, name, flags);
367 
368  if(q_length > MAX_Q_LENGTH)
369  return SPEW_SYS_ERR_RET(NULL, _WARN, EINVAL,
370  "%s(,,q_length=%d,,) failed: "
371  "the max q_length is %d",
372  __func__, q_length, MAX_Q_LENGTH);
373 
374  if(element_size == SHM_UNKNOWN_SIZE && (flags & O_CREAT))
375  return SPEW_SYS_ERR_RET(NULL, _WARN, EINVAL,
376  "%s(,element_size=SHM_UNKNOWN_SIZE,"
377  ",,) failed: flag O_CREAT should not "
378  "be set to connect to unknow size "
379  "element", __func__);
380 
381  /* We don't know if this is the entries seg size, but if we create
382  * it or it is smaller now it will be this size when we are done. */
383  if(q_length < 1) q_length = 1;
384 
385  q = (struct smq *) malloc(sizeof(struct smq));
386  if(!q) return NULL;
387 
388  /* We get the lock first to make this all atomic with respect to the
389  * arena file. If the user has an arena write lock already, it's
390  * okay they are recursive. */
391  if(shm_arena_wrlock(arena))
392  {
393  free(q);
394  return NULL;
395  }
396 
397  /* after shm_arena_wrlock() we have an arena or we have a default
398  * arena */
399  if(!arena)
400  {
401  pthread_mutex_lock(&_shm_create_arena_mutex);
402  arena = _shm_default_arena;
403  pthread_mutex_unlock(&_shm_create_arena_mutex);
404  }
405  q->arena = arena;
406  q->read_index = 0;
407  q->q_length = -1; /* mark a new connection */
408  q->header = NULL;
409  q->entries_seg = NULL;
410  q->timestamp = NULL;
411  q->flags = ((flags & SHM_TIMESTAMP)?TIMESTAMPED:0);
412  q->unblock_rdlock = 0;
413  q->thread_count = 0;
414  pthread_mutex_init(&q->thread_count_mutex, NULL);
415  pthread_cond_init(&q->thread_count_cond, NULL);
416 
417  /* We need the segment read-write lock for the Multi-Queue header
418  * segment so we will make sure we request them. */
419 
420  errno_save = errno;
421  errno = 0;
422  q->header = (struct smq_header *)
423  shm_get(arena, sizeof(struct smq_header), name, 0);
424 
425  if(q->header)
426  {
427  errno = errno_save;
428  /* the header segment exists already */
429  if(q->header->magic != MAGIC)
430  {
431  SPEW_SYS_ERR(_WARN, EINVAL, "Segment \"%s\" is not a Multi-Queue "
432  "bad magic number", name);
433  goto fail;
434  }
435  if(sizeof(struct smq_header) != shm_size(arena, q->header))
436  {
437  SPEW_SYS_ERR(_WARN, EINVAL, "Segment \"%s\" is wrong size",
438  name);
439  goto fail;
440  }
441  if(element_size != q->header->element_size &&
442  element_size != SHM_UNKNOWN_SIZE)
443  {
444  SPEW_SYS_ERR(_WARN, EINVAL,
445  "Segment \"%s\" has element_size=%zu not %zu",
446  name, q->header->element_size, element_size);
447  goto fail;
448  }
449  if(connect(q)) goto fail;
450  }
451  else if(element_size == SHM_UNKNOWN_SIZE)
452  {
453  SPEW_SYS(_INFO, "Shared Multi-Queue \"%s\" does not exist", name);
454  goto fail;
455  }
456  else if(!errno && flags & O_CREAT)
457  {
458  errno = errno_save;
459  /* the header segment does not exist already */
460  /* make a new Shared Multi-Queue */
461 
462  q->header = (struct smq_header *)
463  shm_get(arena, sizeof(struct smq_header), name,
464  flags & (~SHM_WITHOUT_RWLOCK) & (~SHM_TIMESTAMP));
465  if(!q->header) goto fail;
466 
467  q->entries_seg = (uint8_t *)
468  shm_get(arena, (q_length + 1)*
469  (CHUNKS(element_size) +
470  ((q->flags & TIMESTAMPED)?CHUNKS(sizeof(long double)):0)),
471  NULL, flags|O_EXCL|SHM_WITHOUT_RWLOCK);
472  if(!q->entries_seg ||
473  !shm_name(arena, q->entries_seg, q->header->entries_seg_name,
474  GEN_NAME_SIZE))
475  {
476  shm_remove(arena, q->entries_seg);
477  goto fail;
478  }
479 
480  {
481  pthread_condattr_t c_attr;
482  int err;
483 
484  if((err = pthread_condattr_init(&c_attr)))
485  {
486  errno = err;
487  SPEW(_WARN, "pthread_condattr_init() failed");
488  goto fail;
489  }
490  if((err = pthread_condattr_setpshared(&c_attr,
491  PTHREAD_PROCESS_SHARED)))
492  {
493  errno = err;
494  SPEW(_WARN, "pthread_condattr_setpshared() failed");
495  goto fail;
496  }
497  if((err = pthread_cond_init(&q->header->cond, &c_attr)))
498  {
499  errno = err;
500  SPEW(_WARN, "pthread_cond_init() failed");
501  goto fail;
502  }
503  if((err = pthread_condattr_destroy(&c_attr)))
504  {
505  errno = err;
506  SPEW(_WARN, "pthread_condattr_destroy() failed");
507  goto fail;
508  }
509 
510  }
511  {
512  pthread_mutexattr_t m_attr;
513  int err;
514 
515 
516  if((err = pthread_mutexattr_init(&m_attr)))
517  {
518  errno = err;
519  SPEW(_WARN, "pthread_mutexattr_init() failed");
520  goto fail;
521  }
522  if((err = pthread_mutexattr_setpshared(&m_attr,
523  PTHREAD_PROCESS_SHARED)))
524  {
525  errno = err;
526  SPEW(_WARN, "pthread_mutexattr_setpshared() failed");
527  goto fail;
528  }
529  if((err = pthread_mutex_init(&q->header->mutex, &m_attr)))
530  {
531  errno = err;
532  SPEW(_WARN, "pthread_mutex_init() failed");
533  goto fail;
534  }
535  if((err = pthread_mutexattr_destroy(&m_attr)))
536  {
537  errno = err;
538  SPEW(_WARN, "pthread_mutexattr_destroy() failed");
539  goto fail;
540  }
541  }
542 
543  /* We are creating the Shared Multi-Queue now. */
544  q->header->magic = MAGIC;
545  q->header->q_length = q_length;
546  q->header->write_index = 0;
547  q->header->wrap_index = 0;
548  q->header->element_size = element_size;
549  q->header->is_blocking_read = 0;
550  q->header->flags = q->flags;
551 
552  q->read_index = 0;
553  q->q_length = q_length;
554  }
555  else if(!errno && !(flags & O_CREAT))
556  {
557  /* some other failure and errno is set */
558  SPEW(_WARN, "Shared Multi-Queue does not exist"
559  " O_CREAT must be set to create it");
560  errno = EINVAL; /* "Invalid argument" */
561  goto fail;
562  }
563  else /* errno && flags & O_CREAT */
564  goto fail;
565 
566  if(q->header->q_length > q_length)
567  /* don't let the queue length get smaller */
568  q_length = q->header->q_length;
569 
570  if(q->header->q_length < q_length ||
571  (q->flags & TIMESTAMPED && !(q->header->flags & TIMESTAMPED)))
572  {
573  /* Increase q_length and/or add a time-stamp */
574  uint8_t *new_entries_seg;
575  size_t new_entry_size, old_entry_size;
576 
577  new_entry_size = CHUNKS(q->header->element_size) +
578  ((q->flags & TIMESTAMPED)?CHUNKS(sizeof(long double)):0);
579  old_entry_size = CHUNKS(q->header->element_size) +
580  ((q->header->flags & TIMESTAMPED)?CHUNKS(sizeof(long double)):0);
581 
582  /* make the new entries segment */
583  new_entries_seg = (uint8_t *)
584  shm_get(arena, (q_length + 1)*new_entry_size,
585  NULL, O_CREAT|O_EXCL|SHM_WITHOUT_RWLOCK);
586 
587 #ifdef SHM_DEBUG
588  /* this should not fail because we have an arena write lock and
589  * we just got this segment */
590  ASSERT(shm_name(arena, new_entries_seg,
591  q->header->entries_seg_name, GEN_NAME_SIZE));
592 #else
593  shm_name(arena, new_entries_seg,
594  q->header->entries_seg_name, GEN_NAME_SIZE);
595 #endif
596 
597  /* copy the old entries to the new entries_seg */
598  if(new_entry_size == old_entry_size)
599  {
600  /* Just the queue length changed. */
601 
602  if(q->header->write_index)
603  /* copy the top of the queue */
604  memcpy(new_entries_seg, q->entries_seg,
605  (q->header->write_index*new_entry_size));
606 
607  ASSERT(q_length > q->header->q_length);
608  /* zero middle of the queue */
609  memset(new_entries_seg +
610  (q->header->write_index*new_entry_size),
611  0, (q_length - q->header->q_length)*new_entry_size);
612 
613  if(q->header->q_length > q->header->write_index)
614  /* copy the bottom of the queue */
615  memcpy(new_entries_seg +
616  (q->header->write_index +
617  (q_length - q->header->q_length)) *
618  new_entry_size,
619  q->entries_seg + q->header->write_index *
620  new_entry_size,
621  ((q->header->q_length + 1) - q->header->write_index) *
622  new_entry_size);
623  }
624  else /* new_entry_size > old_entry_size */
625  {
626  /* We added a time-stamp and the queue length may or may not
627  * be longer. */
628  q_length_t i, max;
629  /* Start with the whole queue zero-ed. The old entries
630  * time-stamps will all be zero. */
631  memset(new_entries_seg, 0, (q_length + 1)*new_entry_size);
632 
633  /* mark the unset time stamps with UNTIMESTAMPED so that we
634  * may see that it is not used yet. */
635  max = q_length+1;
636  for(i=0;i<max;i++)
637  {
638  long double *tv;
639  tv = (long double *)
640  (new_entries_seg +
641  (i*new_entry_size + CHUNKS(q->header->element_size)));
642  *tv = UNTIMESTAMPED;
643  }
644 
645  if(q->header->write_index)
646  {
647  /* copy the top of the queue */
648  max = q->header->q_length - q->header->write_index;
649  for(i=0;i<max;i++)
650  memcpy(new_entries_seg + i*new_entry_size,
651  q->entries_seg + i*old_entry_size,
652  q->header->element_size);
653  }
654 
655  if(q->header->q_length > q->header->write_index)
656  {
657  /* copy the bottom of the queue */
658  max = (q->header->q_length + 1) - q->header->write_index;
659  for(i=0;i<max;i++)
660  memcpy(new_entries_seg +
661  (q->header->write_index +
662  (q_length - q->header->q_length) +
663  i) * new_entry_size,
664  q->entries_seg +
665  (q->header->write_index + i) * new_entry_size,
666  q->header->element_size);
667  }
668  SPEW(_INFO, "Added time stamping to Shared Multi-Queue");
669  }
670 
671  /* remove the old entries segment */
672  if(shm_remove(arena, q->entries_seg))
673  goto fail;
674 
675  /* Set the read_index */
676  if(q->read_index > q->header->write_index &&
677  !(q->flags & TIMESTAMPED && !(q->header->flags & TIMESTAMPED)))
678  q->read_index += q_length - q->header->q_length - 1;
679  else if(q->flags & TIMESTAMPED && !(q->header->flags & TIMESTAMPED))
680  /* If we just added time-stamping we set the read_index so
681  * that the queue looks like all the current entries have been
682  * read, so that we can't read any entries that are not
683  * time-stamped. */
684  q->read_index = q->header->write_index;
685 
686  q->header->q_length = q->q_length = q_length;
687  q->header->flags = q->flags;
688  q->entries_seg = new_entries_seg;
689  }
690 
691  if(shm_arena_unlock(arena))
692  {
693  free(q);
694  q = NULL;
695  }
696 
697  return q;
698 
699  fail:
700 
701  shm_arena_unlock(arena);
702  free(q);
703  return NULL;
704 }
705 
718 {
719  int ret;
720  int err = 0;
721  SPEW(_DEBUG, "%s(q=%p)", __func__, q);
722 
723  if(!q || !q->arena)
724  {
725  SPEW(_WARN, "Invalid Shared Multi-Queue object");
726  errno = EINVAL; /* "Invalid argument" */
727  return -1;
728  }
729 
730  /* We need an exclusive arena write lock to remove shared memory
731  * segments. We use the auto write locker in case the user has the
732  * arena write lock already. */
733  if(!get_arena_and_autolock(q->arena, 2, &err IF_SPEW(, __func__)))
734  {
735  errno = err;
736  SPEW(_WARN, "Invalid Shared Multi-Queue object");
737  return -1;
738  }
739 
740  ret = smq_wrlock(q, 0);
741  if(ret) goto unlock;
742 
743  if(q->header->magic != MAGIC)
744  {
745  shm_unlock(q->arena, q->header);
746  SPEW(_WARN, "Bad magic number in Shared Multi-Queue segment header");
747  errno = EINVAL; /* "Invalid argument" */
748  ret = -1;
749  goto unlock;
750  }
751 
752  shm_unlock(q->arena, q->header);
753 
754  q->header->magic = 0;
755 
756  ret = shm_remove(q->arena, q->header);
757 
758  if(!ret)
759  ret = shm_remove(q->arena, q->entries_seg);
760  else
761  shm_remove(q->arena, q->entries_seg);
762 
763 
764  unlock:
765 
766  if(!ret)
767  err = arena_autounlock(q->arena IF_SPEW(, __func__));
768  else
769  arena_autounlock(q->arena IF_SPEW(, __func__));
770 
771  if(err)
772  {
773  errno = err;
774  ret = -1;
775  }
776 
777  free(q);
778 
779  return ret;
780 }
781 
793 {
794  return q->header->element_size;
795 }
796 
811 {
812  SPEW(_DEBUG, "%s(q=%p)", __func__, q);
813 
814  /* Wait until all unblock_rdlock_thread() threads are finished.
815  * This could keep the inter-process shared memory from getting
816  * deadlocked due to the thread getting killed by a process exit
817  * when the thread has a lock. */
818  pthread_mutex_lock(&q->thread_count_mutex);
819  while(q->thread_count)
820  {
821  pthread_cond_wait(&q->thread_count_cond, &q->thread_count_mutex);
822  }
823  pthread_mutex_unlock(&q->thread_count_mutex);
824 
825  pthread_mutex_destroy(&q->thread_count_mutex);
826  pthread_cond_destroy(&q->thread_count_cond);
827 
828  /* Lets make the object be really unusable for really stupid
829  * users. */
830  memset(q, 0, sizeof(struct smq));
831 
832  free(q);
833 
834  return 0;
835 }
836 
837 static inline int check_connect(smq_t q)
838 // We need a shm_rdlock(q->arena, q->header)
839 // to call this.
840 {
841  if(q->q_length != q->header->q_length ||
842  q->flags != q->header->flags)
843  return connect(q);
844  return 0;
845 }
846 
847 static inline int get_num_entries(smq_t q)
848 // We need a shm_rdlock(q->arena, q->header)
849 // to call this.
850 {
851  if(q->header->write_index >= q->read_index)
852  return q->header->write_index - q->read_index;
853  else
854  return 1 + q->q_length +
855  q->header->write_index - q->read_index;
856 }
857 
858 
881 int smq_rdlock(smq_t q, int num)
882 {
883  int ret, err;
884 
885  SPEW(_DEBUG, "%s(q=%p, num=%d)", __func__, q, num);
886 
887  if(num < 1) /* nonblocking read */
888  {
889  int ret;
890 
891  ret = shm_rdlock(q->arena, q->header);
892  if(ret) return -1;
893 
894  ret = check_connect(q);
895 
896  if(ret)
897  shm_unlock(q->arena, q->header);
898  else
899  return get_num_entries(q);
900 
901  return -1;
902  }
903 
904 
905  if(num > q->q_length)
906  num = q->q_length;
907 
908  /* blocking read */
909  if(shm_rdlock(q->arena, q->header)) return -1;
910 
911  if(check_connect(q))
912  {
913  shm_unlock(q->arena, q->header);
914  return -1;
915  }
916 
917  ret = get_num_entries(q);
918 
919  while(ret < num)
920  {
921  if((err = pthread_mutex_lock(&q->header->mutex)))
922  {
923  shm_unlock(q->arena, q->header);
924  errno = err;
925  SPEW(_WARN, "%s(): pthread_mutex_lock() failed",
926  __func__); /* We're screwed */
927  return -1;
928  }
929 
930  if(q->unblock_rdlock)
931  {
932  q->unblock_rdlock = 0;
933  pthread_mutex_unlock(&q->header->mutex);
934  return ret;
935  }
936 
937  q->header->is_blocking_read = 1;
938 
939  // TODO: We need to keep the arena read lock
940  // but release the segment lock; but we can't
941  // until we fix how we create shared
942  // memory, it would hang all shm_arena_wrlock()
943  // calls.
944  if(shm_unlock(q->arena, q->header)) return -1; /* screwed */
945 
946  ret = pthread_cond_wait(&q->header->cond, &q->header->mutex);
947  if(ret)
948  {
949  SPEW(_WARN, "%s(): pthread_cond_wait() failed", __func__);
950  pthread_mutex_unlock(&q->header->mutex);
951  shm_arena_unlock(q->arena);
952  return -1;
953  }
954 
955  ret = shm_rdlock(q->arena, q->header);
956 
957  pthread_mutex_unlock(&q->header->mutex);
958 
959  if(check_connect(q))
960  {
961  shm_unlock(q->arena, q->header);
962  return -1;
963  }
964 
965  ret = get_num_entries(q);
966  }
967 
968  return ret;
969 }
970 
971 /* This is the worker thread that smq_unblock_rdlock() spawns. */
972 static void *unblock_rdlock_thread(void *d)
973 {
974  smq_t q;
975  int ret;
976 
977  q = (smq_t) d;
978 
979  SPEW(_DEBUG, "%s(q=%p)", __func__, q);
980 
981  ret = pthread_mutex_lock(&q->header->mutex);
982  if(ret) goto thread_exit; /* We're screwed */
983 
984  /* We'll keep the next smq_rdlock() from calling pthread_cond_wait()
985  * or we'll call pthread_cond_broadcast(). Either way we will keep
986  * the current and the next smq_rdlock() call from blocking. */
987  q->unblock_rdlock = 1;
988 
989  if(q->header->is_blocking_read)
990  {
991  q->header->is_blocking_read = 0;
992  pthread_cond_broadcast(&q->header->cond);
993  }
994 
995  pthread_mutex_unlock(&q->header->mutex);
996 
997  thread_exit:
998 
999  /* Let others know we are done. */
1000  pthread_mutex_lock(&q->thread_count_mutex);
1001  q->thread_count--;
1002  if(q->thread_count == 0)
1003  pthread_cond_broadcast(&q->thread_count_cond);
1004  pthread_mutex_unlock(&q->thread_count_mutex);
1005 
1006  return NULL;
1007 }
1008 
1009 
1028 {
1029  int ret;
1030  pthread_t thread;
1031 
1032  SPEW(_DEBUG, "%s(q=%p)", __func__, q);
1033 
1034  pthread_mutex_lock(&q->thread_count_mutex);
1035  q->thread_count++;
1036  pthread_mutex_unlock(&q->thread_count_mutex);
1037 
1038  /* By running the shm_wrlock(), sem_post(), and shm_wrlock() in a
1039  * different thread we don't run the risk of dead-lock do to calling
1040  * back-to-back calls of shm_wrlock() in the same thread. */
1041  ret = pthread_create(&thread, NULL, unblock_rdlock_thread, q);
1042  if(!ret)
1043  pthread_detach(thread);
1044  else
1045  {
1046  /* The thread was not created */
1047  pthread_mutex_lock(&q->thread_count_mutex);
1048  q->thread_count--;
1049  pthread_mutex_unlock(&q->thread_count_mutex);
1050  }
1051 
1052  return ret;
1053 }
1054 
1073 int smq_wrlock(smq_t q, int num)
1074 {
1075  int ret;
1076  SPEW(_DEBUG, "%s(q=%p, num=%d)", __func__, q, num);
1077  ret = shm_wrlock(q->arena, q->header);
1078  if(ret) return ret;
1079 
1080  ret = check_connect(q);
1081  if(ret)
1082  shm_unlock(q->arena, q->header);
1083 
1084  return ret;
1085 }
1086 
1103 {
1104  SPEW(_DEBUG, "%s(q=%p)", __func__, q);
1105 
1106  /* If we are time-stamping and we haven't time-stamped the last
1107  * entry we need to do it now. */
1108  if(q->timestamp)
1109  {
1110  struct timeval t;
1111  gettimeofday(&t, NULL);
1112  *(q->timestamp) = t.tv_sec + t.tv_usec * 1.0e-6L;
1113  }
1114 
1115  return shm_unlock(q->arena, q->header);
1116 }
1117 
1136 long double *smq_timestamp(smq_t q, const void *ptr)
1137 {
1138  if(q->flags & TIMESTAMPED)
1139  {
1140  /* If this is a writer calling this, we will assume that they
1141  * will write a time-stamp, so we will mark it as done. */
1142  if(q->timestamp) q->timestamp = NULL;
1143 
1144  return (long double *)
1145  (((uint8_t *) ptr) + CHUNKS(q->header->element_size));
1146  }
1147  return NULL;
1148 }
1149 
1166 {
1167  void *ret;
1168  int err = 0;
1169 
1170  SPEW(_DEBUG, "%s(q=%p)", __func__, q);
1171 
1172  /* If we are time-stamping and we haven't time-stamped the last
1173  * entry we need to do it now. */
1174  if(q->timestamp)
1175  {
1176  struct timeval tv;
1177  gettimeofday(&tv, NULL);
1178  *(q->timestamp) = tv.tv_sec + tv.tv_usec * 1.0e-6L;
1179  }
1180 
1181  ret = (void *)
1182  (q->entries_seg +
1183  ((q->header->write_index)*
1184  (CHUNKS(q->header->element_size) +
1185  ((q->flags & TIMESTAMPED)?CHUNKS(sizeof(long double)):0))));
1186 
1187  q->header->write_index++;
1188  if(q->header->write_index > q->q_length)
1189  q->header->write_index = 0;
1190  if(q->header->wrap_index <= q->q_length)
1191  q->header->wrap_index++;
1192 
1193 
1194  /* Unblock a smq_rdlock() if it is needed. */
1195  if((err = pthread_mutex_lock(&q->header->mutex)))
1196  return
1197  SPEW_SYS_ERR_RET(NULL, _WARN, err,
1198  "pthread_mutex_lock() failed"); /* We're screwed */
1199  if(q->header->is_blocking_read)
1200  {
1201  q->header->is_blocking_read = 0;
1202  pthread_cond_broadcast(&q->header->cond);
1203  }
1204  pthread_mutex_unlock(&q->header->mutex);
1205 
1206 
1207 
1208  if(ret && (q->flags & TIMESTAMPED))
1209  /* This is the only place that q->timestamp is ever set. */
1210  q->timestamp = (long double *)
1211  (((uint8_t *) ret) + CHUNKS(q->header->element_size));
1212 
1213  return ret;
1214 }
1215 
1229 void *smq_poll(smq_t q)
1230 {
1231  q_length_t index;
1232 
1233  SPEW(_DEBUG, "%s(q=%p)", __func__, q);
1234 
1235  index = q->header->write_index - 1;
1236  if(index < 0 && q->header->wrap_index)
1237  index = q->q_length;
1238  else if(q->header->wrap_index == 0)
1239  return NULL; /* there is no data written yet */
1240 
1241  /* flush the the queue */
1242  q->read_index = q->header->write_index;
1243 
1244  return (void *)
1245  (q->entries_seg +
1246  index*(
1247  CHUNKS(q->header->element_size) +
1248  ((q->flags & TIMESTAMPED)?CHUNKS(sizeof(long double)):0))
1249  );
1250 }
1251 
1262 void *smq_read(smq_t q)
1263 {
1264  SPEW(_DEBUG, "%s(q=%p)", __func__, q);
1265  void *ret;
1266 
1267  /* read queue */
1268  if(q->read_index == q->header->write_index)
1269  return NULL;
1270 
1271  ret = (void *)
1272  (q->entries_seg +
1273  (q->read_index)*
1274  (CHUNKS(q->header->element_size) +
1275  ((q->flags & TIMESTAMPED)?CHUNKS(sizeof(long double)):0)));
1276 
1277  q->read_index++;
1278  if(q->read_index > q->q_length)
1279  q->read_index = 0;
1280 
1281  return ret;
1282 }
long double * smq_timestamp(smq_t q, const void *ptr)
read or write the time stamp
Definition: smq.c:1136
smq_t smq_get(shm_arena_t arena, size_t element_size, int q_length, const char *name, int flags)
get a Shared Multi-Queue object
Definition: smq.c:358
ssize_t shm_size(shm_arena_t arena, const void *ptr)
get the size of a shared memory segment
Definition: size_name.c:63
void * smq_poll(smq_t q)
poll the last entry from the Shared Multi-Queue
Definition: smq.c:1229
#define SHM_UNKNOWN_SIZE
shm_get() flag for when you don't know the segment size
Definition: shm_arena.h:165
void * shm_get(shm_arena_t arena, size_t size, const char *name, int flags)
get a shared memory segment
Definition: get.c:437
int shm_remove(shm_arena_t arena, const void *ptr)
remove a shared memory segment
Definition: remove.c:312
int shm_unlock(shm_arena_t arena, const void *ptr)
release a shared memory segment read-write lock
Definition: lock.c:268
int shm_arena_wrlock(shm_arena_t arena)
acquire an arena write lock
Definition: lock.c:80
size_t smq_element_size(smq_t q)
get the element size of Shared Multi-Queue object
Definition: smq.c:792
int smq_rdlock(smq_t q, int num)
acquire Shared Multi-Queue read-lock
Definition: smq.c:881
int smq_unblock_rdlock(smq_t q)
unblock a smq_rdlock() call if needed
Definition: smq.c:1027
#define SHM_TIMESTAMP
smq_get() flag for adding a timestamp to entries
Definition: shm_arena.h:158
int smq_remove(smq_t q)
remove a Shared Multi-Queue from the arena
Definition: smq.c:717
void * smq_write(smq_t q)
write an entry into the Shared Multi-Queue
Definition: smq.c:1165
int smq_delete(smq_t q)
delete the local Shared Multi-Queue object
Definition: smq.c:810
struct smq * smq_t
shared multi-queue object
Definition: shm_arena.h:196
char * shm_name(shm_arena_t arena, const void *ptr, char *name, size_t name_size)
get the name of a shared memory segment for a pointer
Definition: size_name.c:227
int shm_wrlock(shm_arena_t arena, const void *ptr)
acquire an segment write lock
Definition: lock.c:177
int shm_arena_unlock(shm_arena_t arena)
release an arena read or write lock
Definition: lock.c:145
int smq_unlock(smq_t q)
release Shared Multi-Queue read or write lock
Definition: smq.c:1102
int smq_wrlock(smq_t q, int num)
acquire Shared Multi-Queue write-lock
Definition: smq.c:1073
void * smq_read(smq_t q)
read the next entry from the Shared Multi-Queue
Definition: smq.c:1262
#define SHM_WITHOUT_RWLOCK
shm_get() flag for not having segment read-write lock
Definition: shm_arena.h:150
int shm_rdlock(shm_arena_t arena, const void *ptr)
acquire an segment read lock
Definition: lock.c:225

Shared Memory Arena version RC-0.0.25