25 #include <sys/types.h>
40 #include "debug_lock.h"
41 #include "shm_arena.h"
43 #include "arena_lock.h"
47 #define MAGIC (0xC30F53AD)
50 #define TIMESTAMPED (01 << 0)
54 #define UNTIMESTAMPED (-(LDBL_MAX))
57 typedef int16_t q_length_t;
60 #define MAX_Q_LENGTH (32766)
67 pthread_mutex_t mutex;
75 q_length_t write_index;
84 q_length_t wrap_index;
88 char entries_seg_name[GEN_NAME_SIZE];
104 struct smq_header *header;
108 long double *timestamp;
110 q_length_t read_index;
117 pthread_mutex_t thread_count_mutex;
122 pthread_cond_t thread_count_cond;
134 uint8_t *entries_seg;
202 int connect(
struct smq *q)
212 if(q->header->wrap_index > q->header->q_length)
215 ASSERT(q->header->wrap_index == q->header->q_length + 1);
217 q->read_index = q->header->write_index + 1;
218 if(q->read_index > q->header->q_length)
220 ASSERT(q->read_index == q->header->q_length + 1);
226 q->read_index = q->header->write_index - q->header->wrap_index;
227 if(q->read_index < 0)
228 q->read_index += q->header->q_length + 1;
231 else if(q->q_length != q->header->q_length)
233 ASSERT(q->q_length < q->header->q_length);
236 if(q->read_index > q->header->write_index)
237 q->read_index += q->header->q_length - q->q_length;
240 entry_size = CHUNKS(q->header->element_size) +
241 ((q->header->flags & TIMESTAMPED)?CHUNKS(
sizeof(
long double)):0);
243 q->entries_seg = (uint8_t *)
244 shm_get(q->arena, (q->header->q_length + 1)*entry_size,
250 if((q->flags & TIMESTAMPED) &&
251 (q->header->flags & TIMESTAMPED) && q->q_length < 0)
258 while(( ( *((
long double *)
260 (q->read_index*entry_size +
261 CHUNKS(q->header->element_size)))))
263 q->read_index != q->header->write_index)
268 if(q->read_index > q->header->q_length)
275 "%sonnected to Shared multi-queue "
276 "with queue length %d with entry segment \"%s\" "
278 (q->q_length < 0)?
"C":
"Rec", q->header->q_length,
279 q->header->entries_seg_name,
280 ((q->header->flags & TIMESTAMPED)?
"with":
"without"));
287 q->flags = q->header->flags;
289 q->q_length = q->header->q_length;
359 int q_length,
const char *name,
int flags)
361 struct smq *q = NULL;
364 SPEW(_INFO,
"%s(arena=%p, element_size=%zu, queue_length=%d,"
365 "name=\"%s\", flags=%d )", __func__, arena, element_size,
366 q_length, name, flags);
368 if(q_length > MAX_Q_LENGTH)
369 return SPEW_SYS_ERR_RET(NULL, _WARN, EINVAL,
370 "%s(,,q_length=%d,,) failed: "
371 "the max q_length is %d",
372 __func__, q_length, MAX_Q_LENGTH);
375 return SPEW_SYS_ERR_RET(NULL, _WARN, EINVAL,
376 "%s(,element_size=SHM_UNKNOWN_SIZE,"
377 ",,) failed: flag O_CREAT should not "
378 "be set to connect to unknow size "
379 "element", __func__);
383 if(q_length < 1) q_length = 1;
385 q = (
struct smq *) malloc(
sizeof(
struct smq));
401 pthread_mutex_lock(&_shm_create_arena_mutex);
402 arena = _shm_default_arena;
403 pthread_mutex_unlock(&_shm_create_arena_mutex);
409 q->entries_seg = NULL;
412 q->unblock_rdlock = 0;
414 pthread_mutex_init(&q->thread_count_mutex, NULL);
415 pthread_cond_init(&q->thread_count_cond, NULL);
422 q->header = (
struct smq_header *)
423 shm_get(arena,
sizeof(
struct smq_header), name, 0);
429 if(q->header->magic != MAGIC)
431 SPEW_SYS_ERR(_WARN, EINVAL,
"Segment \"%s\" is not a Multi-Queue "
432 "bad magic number", name);
435 if(
sizeof(
struct smq_header) !=
shm_size(arena, q->header))
437 SPEW_SYS_ERR(_WARN, EINVAL,
"Segment \"%s\" is wrong size",
441 if(element_size != q->header->element_size &&
444 SPEW_SYS_ERR(_WARN, EINVAL,
445 "Segment \"%s\" has element_size=%zu not %zu",
446 name, q->header->element_size, element_size);
449 if(connect(q))
goto fail;
453 SPEW_SYS(_INFO,
"Shared Multi-Queue \"%s\" does not exist", name);
456 else if(!errno && flags & O_CREAT)
462 q->header = (
struct smq_header *)
463 shm_get(arena,
sizeof(
struct smq_header), name,
465 if(!q->header)
goto fail;
467 q->entries_seg = (uint8_t *)
469 (CHUNKS(element_size) +
470 ((q->flags & TIMESTAMPED)?CHUNKS(
sizeof(
long double)):0)),
472 if(!q->entries_seg ||
473 !
shm_name(arena, q->entries_seg, q->header->entries_seg_name,
481 pthread_condattr_t c_attr;
484 if((err = pthread_condattr_init(&c_attr)))
487 SPEW(_WARN,
"pthread_condattr_init() failed");
490 if((err = pthread_condattr_setpshared(&c_attr,
491 PTHREAD_PROCESS_SHARED)))
494 SPEW(_WARN,
"pthread_condattr_setpshared() failed");
497 if((err = pthread_cond_init(&q->header->cond, &c_attr)))
500 SPEW(_WARN,
"pthread_cond_init() failed");
503 if((err = pthread_condattr_destroy(&c_attr)))
506 SPEW(_WARN,
"pthread_condattr_destroy() failed");
512 pthread_mutexattr_t m_attr;
516 if((err = pthread_mutexattr_init(&m_attr)))
519 SPEW(_WARN,
"pthread_mutexattr_init() failed");
522 if((err = pthread_mutexattr_setpshared(&m_attr,
523 PTHREAD_PROCESS_SHARED)))
526 SPEW(_WARN,
"pthread_mutexattr_setpshared() failed");
529 if((err = pthread_mutex_init(&q->header->mutex, &m_attr)))
532 SPEW(_WARN,
"pthread_mutex_init() failed");
535 if((err = pthread_mutexattr_destroy(&m_attr)))
538 SPEW(_WARN,
"pthread_mutexattr_destroy() failed");
544 q->header->magic = MAGIC;
545 q->header->q_length = q_length;
546 q->header->write_index = 0;
547 q->header->wrap_index = 0;
548 q->header->element_size = element_size;
549 q->header->is_blocking_read = 0;
550 q->header->flags = q->flags;
553 q->q_length = q_length;
555 else if(!errno && !(flags & O_CREAT))
558 SPEW(_WARN,
"Shared Multi-Queue does not exist"
559 " O_CREAT must be set to create it");
566 if(q->header->q_length > q_length)
568 q_length = q->header->q_length;
570 if(q->header->q_length < q_length ||
571 (q->flags & TIMESTAMPED && !(q->header->flags & TIMESTAMPED)))
574 uint8_t *new_entries_seg;
575 size_t new_entry_size, old_entry_size;
577 new_entry_size = CHUNKS(q->header->element_size) +
578 ((q->flags & TIMESTAMPED)?CHUNKS(
sizeof(
long double)):0);
579 old_entry_size = CHUNKS(q->header->element_size) +
580 ((q->header->flags & TIMESTAMPED)?CHUNKS(
sizeof(
long double)):0);
583 new_entries_seg = (uint8_t *)
584 shm_get(arena, (q_length + 1)*new_entry_size,
590 ASSERT(
shm_name(arena, new_entries_seg,
591 q->header->entries_seg_name, GEN_NAME_SIZE));
594 q->header->entries_seg_name, GEN_NAME_SIZE);
598 if(new_entry_size == old_entry_size)
602 if(q->header->write_index)
604 memcpy(new_entries_seg, q->entries_seg,
605 (q->header->write_index*new_entry_size));
607 ASSERT(q_length > q->header->q_length);
609 memset(new_entries_seg +
610 (q->header->write_index*new_entry_size),
611 0, (q_length - q->header->q_length)*new_entry_size);
613 if(q->header->q_length > q->header->write_index)
615 memcpy(new_entries_seg +
616 (q->header->write_index +
617 (q_length - q->header->q_length)) *
619 q->entries_seg + q->header->write_index *
621 ((q->header->q_length + 1) - q->header->write_index) *
631 memset(new_entries_seg, 0, (q_length + 1)*new_entry_size);
641 (i*new_entry_size + CHUNKS(q->header->element_size)));
645 if(q->header->write_index)
648 max = q->header->q_length - q->header->write_index;
650 memcpy(new_entries_seg + i*new_entry_size,
651 q->entries_seg + i*old_entry_size,
652 q->header->element_size);
655 if(q->header->q_length > q->header->write_index)
658 max = (q->header->q_length + 1) - q->header->write_index;
660 memcpy(new_entries_seg +
661 (q->header->write_index +
662 (q_length - q->header->q_length) +
665 (q->header->write_index + i) * new_entry_size,
666 q->header->element_size);
668 SPEW(_INFO,
"Added time stamping to Shared Multi-Queue");
676 if(q->read_index > q->header->write_index &&
677 !(q->flags & TIMESTAMPED && !(q->header->flags & TIMESTAMPED)))
678 q->read_index += q_length - q->header->q_length - 1;
679 else if(q->flags & TIMESTAMPED && !(q->header->flags & TIMESTAMPED))
684 q->read_index = q->header->write_index;
686 q->header->q_length = q->q_length = q_length;
687 q->header->flags = q->flags;
688 q->entries_seg = new_entries_seg;
721 SPEW(_DEBUG,
"%s(q=%p)", __func__, q);
725 SPEW(_WARN,
"Invalid Shared Multi-Queue object");
733 if(!get_arena_and_autolock(q->arena, 2, &err IF_SPEW(, __func__)))
736 SPEW(_WARN,
"Invalid Shared Multi-Queue object");
743 if(q->header->magic != MAGIC)
746 SPEW(_WARN,
"Bad magic number in Shared Multi-Queue segment header");
754 q->header->magic = 0;
767 err = arena_autounlock(q->arena IF_SPEW(, __func__));
769 arena_autounlock(q->arena IF_SPEW(, __func__));
794 return q->header->element_size;
812 SPEW(_DEBUG,
"%s(q=%p)", __func__, q);
818 pthread_mutex_lock(&q->thread_count_mutex);
819 while(q->thread_count)
821 pthread_cond_wait(&q->thread_count_cond, &q->thread_count_mutex);
823 pthread_mutex_unlock(&q->thread_count_mutex);
825 pthread_mutex_destroy(&q->thread_count_mutex);
826 pthread_cond_destroy(&q->thread_count_cond);
830 memset(q, 0,
sizeof(
struct smq));
837 static inline int check_connect(
smq_t q)
841 if(q->q_length != q->header->q_length ||
842 q->flags != q->header->flags)
847 static inline int get_num_entries(
smq_t q)
851 if(q->header->write_index >= q->read_index)
852 return q->header->write_index - q->read_index;
854 return 1 + q->q_length +
855 q->header->write_index - q->read_index;
885 SPEW(_DEBUG,
"%s(q=%p, num=%d)", __func__, q, num);
894 ret = check_connect(q);
899 return get_num_entries(q);
905 if(num > q->q_length)
909 if(
shm_rdlock(q->arena, q->header))
return -1;
917 ret = get_num_entries(q);
921 if((err = pthread_mutex_lock(&q->header->mutex)))
925 SPEW(_WARN,
"%s(): pthread_mutex_lock() failed",
930 if(q->unblock_rdlock)
932 q->unblock_rdlock = 0;
933 pthread_mutex_unlock(&q->header->mutex);
937 q->header->is_blocking_read = 1;
944 if(
shm_unlock(q->arena, q->header))
return -1;
946 ret = pthread_cond_wait(&q->header->cond, &q->header->mutex);
949 SPEW(_WARN,
"%s(): pthread_cond_wait() failed", __func__);
950 pthread_mutex_unlock(&q->header->mutex);
957 pthread_mutex_unlock(&q->header->mutex);
965 ret = get_num_entries(q);
972 static void *unblock_rdlock_thread(
void *d)
979 SPEW(_DEBUG,
"%s(q=%p)", __func__, q);
981 ret = pthread_mutex_lock(&q->header->mutex);
982 if(ret)
goto thread_exit;
987 q->unblock_rdlock = 1;
989 if(q->header->is_blocking_read)
991 q->header->is_blocking_read = 0;
992 pthread_cond_broadcast(&q->header->cond);
995 pthread_mutex_unlock(&q->header->mutex);
1000 pthread_mutex_lock(&q->thread_count_mutex);
1002 if(q->thread_count == 0)
1003 pthread_cond_broadcast(&q->thread_count_cond);
1004 pthread_mutex_unlock(&q->thread_count_mutex);
1032 SPEW(_DEBUG,
"%s(q=%p)", __func__, q);
1034 pthread_mutex_lock(&q->thread_count_mutex);
1036 pthread_mutex_unlock(&q->thread_count_mutex);
1041 ret = pthread_create(&thread, NULL, unblock_rdlock_thread, q);
1043 pthread_detach(thread);
1047 pthread_mutex_lock(&q->thread_count_mutex);
1049 pthread_mutex_unlock(&q->thread_count_mutex);
1076 SPEW(_DEBUG,
"%s(q=%p, num=%d)", __func__, q, num);
1080 ret = check_connect(q);
1104 SPEW(_DEBUG,
"%s(q=%p)", __func__, q);
1111 gettimeofday(&t, NULL);
1112 *(q->timestamp) = t.tv_sec + t.tv_usec * 1.0e-6L;
1138 if(q->flags & TIMESTAMPED)
1142 if(q->timestamp) q->timestamp = NULL;
1144 return (
long double *)
1145 (((uint8_t *) ptr) + CHUNKS(q->header->element_size));
1170 SPEW(_DEBUG,
"%s(q=%p)", __func__, q);
1177 gettimeofday(&tv, NULL);
1178 *(q->timestamp) = tv.tv_sec + tv.tv_usec * 1.0e-6L;
1183 ((q->header->write_index)*
1184 (CHUNKS(q->header->element_size) +
1185 ((q->flags & TIMESTAMPED)?CHUNKS(
sizeof(
long double)):0))));
1187 q->header->write_index++;
1188 if(q->header->write_index > q->q_length)
1189 q->header->write_index = 0;
1190 if(q->header->wrap_index <= q->q_length)
1191 q->header->wrap_index++;
1195 if((err = pthread_mutex_lock(&q->header->mutex)))
1197 SPEW_SYS_ERR_RET(NULL, _WARN, err,
1198 "pthread_mutex_lock() failed");
1199 if(q->header->is_blocking_read)
1201 q->header->is_blocking_read = 0;
1202 pthread_cond_broadcast(&q->header->cond);
1204 pthread_mutex_unlock(&q->header->mutex);
1208 if(ret && (q->flags & TIMESTAMPED))
1210 q->timestamp = (
long double *)
1211 (((uint8_t *) ret) + CHUNKS(q->header->element_size));
1233 SPEW(_DEBUG,
"%s(q=%p)", __func__, q);
1235 index = q->header->write_index - 1;
1236 if(index < 0 && q->header->wrap_index)
1237 index = q->q_length;
1238 else if(q->header->wrap_index == 0)
1242 q->read_index = q->header->write_index;
1247 CHUNKS(q->header->element_size) +
1248 ((q->flags & TIMESTAMPED)?CHUNKS(
sizeof(
long double)):0))
1264 SPEW(_DEBUG,
"%s(q=%p)", __func__, q);
1268 if(q->read_index == q->header->write_index)
1274 (CHUNKS(q->header->element_size) +
1275 ((q->flags & TIMESTAMPED)?CHUNKS(
sizeof(
long double)):0)));
1278 if(q->read_index > q->q_length)
long double * smq_timestamp(smq_t q, const void *ptr)
read or write the time stamp
smq_t smq_get(shm_arena_t arena, size_t element_size, int q_length, const char *name, int flags)
get a Shared Multi-Queue object
ssize_t shm_size(shm_arena_t arena, const void *ptr)
get the size of a shared memory segment
void * smq_poll(smq_t q)
poll the last entry from the Shared Multi-Queue
#define SHM_UNKNOWN_SIZE
shm_get() flag for when you don't know the segment size
void * shm_get(shm_arena_t arena, size_t size, const char *name, int flags)
get a shared memory segment
int shm_remove(shm_arena_t arena, const void *ptr)
remove a shared memory segment
int shm_unlock(shm_arena_t arena, const void *ptr)
release a shared memory segment read-write lock
int shm_arena_wrlock(shm_arena_t arena)
acquire an arena write lock
size_t smq_element_size(smq_t q)
get the element size of Shared Multi-Queue object
int smq_rdlock(smq_t q, int num)
acquire Shared Multi-Queue read-lock
int smq_unblock_rdlock(smq_t q)
unblock a smq_rdlock() call if needed
#define SHM_TIMESTAMP
smq_get() flag for adding a timestamp to entries
int smq_remove(smq_t q)
remove a Shared Multi-Queue from the arena
void * smq_write(smq_t q)
write an entry into the Shared Multi-Queue
int smq_delete(smq_t q)
delete the local Shared Multi-Queue object
struct smq * smq_t
shared multi-queue object
char * shm_name(shm_arena_t arena, const void *ptr, char *name, size_t name_size)
get the name of a shared memory segment for a pointer
int shm_wrlock(shm_arena_t arena, const void *ptr)
acquire an segment write lock
int shm_arena_unlock(shm_arena_t arena)
release an arena read or write lock
int smq_unlock(smq_t q)
release Shared Multi-Queue read or write lock
int smq_wrlock(smq_t q, int num)
acquire Shared Multi-Queue write-lock
void * smq_read(smq_t q)
read the next entry from the Shared Multi-Queue
#define SHM_WITHOUT_RWLOCK
shm_get() flag for not having segment read-write lock
int shm_rdlock(shm_arena_t arena, const void *ptr)
acquire an segment read lock