windows - How can an interprocess producer consumer message passing mechanism be protected against corruption due to one side crashing? -
i have implemented interprocess message queue in shared memory 1 producer , 1 consumer on windows.
i using 1 named semaphore count empty slots, 1 named semaphore count full slots , 1 named mutex protect data structure in shared memory.
consider, example consumer side. producer side similar. first waits on full semaphore (1) takes message queue under mutex , signals empty semaphore (2)
the problem:
if consumer process crashes between (1) , (2) number of slots in queue can used process reduced one. assume while consumer down, producer can handle queue getting filled up. (it can either specify timeout when waiting on empty semaphore or specify 0 no wait).
when consumer restarts can continue read data queue. data not have been overrun after empties full slots, producer have 1 less empty slot use.
after multiple such restarts queue have no slots can used , no messages can sent.
question:
how can situation avoided or recovered from?
here's outline of 1 simple approach, using events rather semaphores:
dword increment_offset(dword offset) { offset++; if (offset == queue_length*2) offset = 0; return offset; } void consumer(void) { (;;) { dword current_write_offset = interlockedcompareexchange(write_offset, 0, 0); if ((current_write_offset != *read_offset + queue_length) && (current_write_offset + queue_length != *read_offset)) { // queue not full, make sure producer awake setevent(signal_producer_event); } if (*read_offset == current_write_offset) { // queue empty, wait producer add message waitforsingleobject(signal_consumer_event, infinite); continue; } memorybarrier(); _readwritebarrier; consume((*read_offset) % queue_length); interlockedexchange(read_offset, increment_offset(*read_offset)); } } void producer(void) { (;;) { dword current_read_offset = interlockedcompareexchange(read_offset, 0, 0); if (current_read_offset != *write_offset) { // queue not empty, make sure consumer awake setevent(signal_consumer_event); } if ((*write_offset == current_read_offset + queue_length) || (*write_offset + queue_length == current_read_offset)) { // queue full, wait consumer remove message waitforsingleobject(signal_producer_event, infinite); continue; } produce((*write_offset) % queue_length); memorybarrier(); _readwritebarrier; interlockedexchange(write_offset, increment_offset(*write_offset)); } } notes:
the code posted compiles (given appropriate declarations) have not otherwise tested it.
read_offsetpointerdwordin shared memory, indicating slot should read next. similarly,write_offsetpointsdwordin shared memory indicating slot should written next.an offset of
queue_length + xrefers same slot offset ofxdisambiguate between full queue , empty queue. that's whyincrement_offset()function checksqueue_length*2ratherqueue_length, why take modulo when callingconsume(),produce()functions. (one alternative approach modify producer never use last available slot, wastes slot.)signal_consumer_event,signal_producer_eventmust automatic-reset events. note setting event set no-op.the consumer waits on event if queue empty, , producer waits on event if queue full.
when either process woken, must recheck state of queue, because there race condition can lead spurious wakeup.
because use interlocked operations, , because 1 process @ time using particular slot, there no need mutex. i've included memory barriers ensure changes producer writes slot seen consumer. if you're not comfortable lock-free code, you'll find trivial convert algorithm shown use mutex instead.
note
interlockedcompareexchange(pointer, 0, 0);looks bit complicated thread-safe equivalent*pointer, i.e., reads value @ pointer. similarly,interlockedexchange(pointer, value);same*pointer = value;thread-safe. depending on compiler , target architecture, interlocked operations may not strictly necessary, performance impact negligible recommend programming defensively.
consider case when consumer crashes during (or before) call consume() function. when consumer restarted, pick same message again , process normal. far producer concerned, nothing unusual has happened, except message took longer usual processed. analogous situation occurs if producer crashes while creating message; when restarted, first message generated overwrite incomplete one, , consumer won't affected.
obviously, if crash occurs after call interlockedexchange before call setevent in either producer or consumer, , if queue empty or full respectively, other process not woken @ point. however, woken crashed process restarted. cannot lose slots in queue, , processes cannot deadlock.
i think simple multiple-producer single-consumer case this:
void producer(void) { (;;) { dword current_read_offset = interlockedcompareexchange(read_offset, 0, 0); if (current_read_offset != *write_offset) { // queue not empty, make sure consumer awake setevent(signal_consumer_event); } produce_in_local_cache(); claim_mutex(); // read offset may have changed, re-read current_read_offset = interlockedcompareexchange(read_offset, 0, 0); if ((*write_offset == current_read_offset + queue_length) || (*write_offset + queue_length == current_read_offset)) { // queue full, wait consumer remove message waitforsingleobject(signal_producer_event, infinite); continue; } copy_from_local_cache_to_shared_memory((*write_offset) % queue_length); memorybarrier(); _readwritebarrier; interlockedexchange(write_offset, increment_offset(*write_offset)); release_mutex(); } } if active producer crashes, mutex detected abandoned; can treat case if mutex released. if crashed process got far incrementing write offset, entry added processed usual; if not, overwritten whichever producer next claims mutex. in neither case special action needed.
Comments
Post a Comment