مسئله تولیدکننده-مصرف‌کننده

از ویکی‌پدیا، دانشنامهٔ آزاد
پرش به: ناوبری، جستجو

مسئله تولیدکننده-مصرف‌کننده (به انگلیسی: Producer–consumer problem) یا مسئله بافر محدود (به انگلیسی: bounded-buffer problem) مسئله‌ای کلاسیک در همگام‌سازی چندپروسه‌ای است که این پروسه‌ها یک ناحیه بحرانی مشترک (به انگلیسی: Critical Section) دارند.

تشریح[ویرایش]

در این مسئله، دو فرایند به نام‌های تولیدکننده و مصرف‌کننده وجود دارد که یک بافر با اندازه‌ای ثابت در بین آنها به اشتراک گذاشته شده و آنها از این بافر به عنوان صف استفاده می‌کنند. وظیفه تولیدکننده این است که یک قلم داده تولید کرده و در بافر قرار دهد. به شکل همزمان، مصرف‌کننده باید این قلم داده را از بافر بردارد و آن را به مصرف برساند. تولیدکننده و مصرف‌کننده تنها می‌توانند هر بار یک قلم داده را در بافر قرار دهند یا از بافر بردارند. مسئله اینجاست که ما باید مطمئن شویم که تولیدکننده سعی نمی‌کند در یک بافر پر شده، داده‌ای را ذخیره کند و یا به طور مشابه، مصرف‌کننده سعی نکند داده‌ای را از یک بافر خالی بردارد. هرگاه که بافر پر بود و تولیدکننده خواست چیزی را در آن ذخیره کند، باید به خواب برود تا اینکه مصرف‌کننده یک قلم داده را از بافر بردارد و بعد از آن تولیدکننده را از خواب بیدار کند. به طور مشابه، اگر مصرف‌کننده خواست یک قلم داده را از یک بافر خالی بردارد، باید به خواب برود تا تولیدکنده یک قلم داده را در بافر قرار دهد و سپس مصرف‌کننده را بیدار کند. یک راه حل نامناسب برای این مسئله، می‌تواند به بن‌بست منجر شود که در آن هر دو پروس همزمان به خواب می‌روند و هیچکس نیست تا آنها را از خواب بیدار کند که باید جلوی این اتفاق را گرفت. این مسئله را می‌توان به n تولیدکننده و m مصرف‌کننده تعمیم داد.

پیاده‌سازی‌ها[ویرایش]

پیاده‌سازی نادرست[ویرایش]

برای حل این مشکل، ممکن است یک برنامه‌نویس ناوارد راه حل زیر را ارائه دهد. در این راه حل، دو رویه کتابخانه‌ای به نام sleep و wakeup مورد استفاده قرار گرفته‌اند. وقتی که فرایندی رویه sleep را فراخوانی می‌کند، فرایند فراخوان به خواب رفته تا زمانی که فرایند دیگر آن را توسط رویه wakeup از خواب بیدار کند. متغیر سراسری itemCount تعداد اقلام موجود در بافر را نشان می‌دهد.

int itemCount = 0;
 
procedure producer() {
    while (true) {
        item = produceItem();
 
        if (itemCount == BUFFER_SIZE) {
            sleep();
        }
 
        putItemIntoBuffer(item);
        itemCount = itemCount + 1;
 
        if (itemCount == 1) {
            wakeup(consumer);
        }
    }
}
 
procedure consumer() {
    while (true) {
 
        if (itemCount == 0) {
            sleep();
        }
 
        item = removeItemFromBuffer();
        itemCount = itemCount - 1;
 
        if (itemCount == BUFFER_SIZE - 1) {
            wakeup(producer);
        }
 
        consumeItem(item);
    }
}

مشکلی که در این راه حل وجود دارد این است که این راه حل یک وضعیت رقابتی دارد که می‌تواند منجر به پدیده بن‌بست شود. سناریو زیر را در نظر بگیرید:

  1. مصرف‌کننده متغیر itemCount را می‌خواند و متوجه می‌شود که مقدار آن صفر است. این اتفاق دقیقاً بعد از وارد شدن به بلاک if رخ می‌دهد.
  2. درست قبل از فراخوانی رویه sleep، اجرای مصرف‌کننده توسط پردازنده متوقف شده و اجرای تولیدکننده آغاز می‌شود. (مثلاً در یک سیستم اشتراک زمانی که پردازنده با سرعت زیاد بین فرایندها سوئیچ می‌کند)
  3. تولیدکننده یک قلم داده را تولید کرده و آن را در بافر قرار داده و سپس یک واحد به itemCount اضافه می‌کند.
  4. از آنجا که این متغیر قبلاً صفر بوده است، تولیدکننده سعی می‌کند که مصرف‌کننده را از خواب بیدار کند.
  5. متأسفانه مصرف‌کننده هنوز نخوابیده است و این وسط سیگنال wakeup گم می‌شود. وقتی که اجرای مصرف‌کننده ادامه می‌یابد، مصرف کننده بلافاصله به خواب رفته و دیگر هرگز بیدار نخواهد شد. به این خاطر که تولیدکننده تنها وقتی مصرف‌کننده را بیدار می‌کند که مقدار متغیر itemCount برابر ۱ باشد.
  6. تولیدکننده آنقدر داده در بافر ذخیره می‌کند تا بافر پر شود. سپس خودش هم به خواب می‌رود. اما مصرف‌کننده‌ای نیست که آن را از خواب بیدار کند.

از آنجا که هر دو پروسه به خواب ابدی رفته‌اند، ما وارد بن‌بست شده‌ایم. بنابراین نتیجه این راه حل رضایت‌بخش نیست.

سمافور[ویرایش]

سمافورها مشکل سیگنال wakeup گم‌شده را حل می‌کنند. در راه حل زیر ما از دو سمافور به نام‌های fillCount و emptyCount برای حل این مسئله استفاده می‌کنیم. fillCount تعداد اقلام موجود در بافر هستند که آماده خواندن هستند و emptyCount تعداد فضاهای خالی موجود در بافر است که می‌توان یک قلم داده را در هر فضا قرار داد. fillCount با قرار دادن یک قلم داده جدید در بافر افزایش می‌یابد. emptyCount هم با بداشتن یک قلم داده از بافر کاهش می‌یابد. اگر emptyCount صفر باشد و تولیدکننده بخواهد آن را کاهش دهد، تولید کننده به خواب خواهد رفت. دفعه بعد که یک قلم داده از بافر برداشته و مصرف شد، emptyCount یک واحد افزایش می‌یابد و تولیدکننده از خواب بیدار می‌شود. مصرف‌کننده هم به شکل مشابهی رفتار می‌کند.

semaphore fillCount = 0; // items produced
semaphore emptyCount = BUFFER_SIZE; // remaining space
 
procedure producer() {
    while (true) {
        item = produceItem();
        down(emptyCount);
            putItemIntoBuffer(item);
        up(fillCount);
    }
}
 
procedure consumer() {
    while (true) {
        down(fillCount);
            item = removeItemFromBuffer();
        up(emptyCount);
        consumeItem(item);
    }
}

وقتی که تنها یک مصرف کننده و یک تولید کننده وجود دارد، راه حل بالا به خوبی کار می‌کند. برای حالتی که چند تولیدکننده قسمت یکسانی از حافظه را به عنوان بافر به اشتراک می‌گذارند، یا چند مصرف‌کننده فضای یکسانی از حافظه را به اشتراک می‌گذارند، این راه حل یک وضعیت رقابتی جدی دارد که باعث می‌شود دو یا چند فرایند سعی کنند از یک قسمت بافر، داده‌ای را به شکل همزمان بخوانند یا بنویسند که ما را به سمت بن‌بست می‌کشاند. برای نشان دادن این مسئله، به پیاده‌سازی روال putItemIntoBuffer()‎ دقت کنید. این روال دربرگیرنده دو عمل است. یکی تعیین می‌کند که اسلات خالی بعدی کدام است و دیگری داده‌ای را در آن اسلات می‌نویسد. اگر این روال به شکل همروند توسط چند تولیدکننده اجرا شود، ممکن است سناریو زیر اتفاق بیفتند:

  • دو تولیدکننده متغیر emptyCount را کاهش می‌دهند.
  • یکی از تولیدکننده‌ها اسلات خالی بعدی در بافر را تعیین می‌کند.
  • تولید کننده بعدی هم اسلات خالی بافر را تعیین می‌کند و همان نتیجه‌ای را می‌گیرد که تولید کننده قبلی گرفته است.
  • هر دو تولید کننده سعی می‌کنند در یک اسلات مشابه از بافر داده‌ای را قرار دهند.

برای حل این مشکل ما باید مطمئن شویم که تنها یک تولیدکننده می‌تواند در هر بار روال putItemIntoBuffer()‎ را اجرا کند. اگر تولیدکننده‌ای مشغول اجرای این روال بود، تولیدکننده‌های دیگر باید منتظر بمانند تا تولیدکننده اول از این روال خارج شود. به عبارتی دیگر ما باید راهی پیدا کنیم که ناحیه بحرانی را با انحصار متقابل اجرا کنیم. برای انجام دادن این کار، ما از یک سمافور دودویی بنام mutex استفاده می‌کنیم. از آنجا که مقدار یک سمافور باینری تنها می‌تواند صفر یا ۱ باشد، تنها یک پروسه می‌تواند در هر بار down(mutex)‎ یا up(mutex)‎ را اجرا کند. راه حل مسئله بالا برای چند تولیدکننده و چند مصرف‌کننده در زیر ارائه شده است:

semaphore mutex = 1;
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;
 
procedure producer() {
    while (true) {
        item = produceItem();
        down(emptyCount);
            down(mutex);
                putItemIntoBuffer(item);
            up(mutex);
        up(fillCount);
    }
}
 
procedure consumer() {
    while (true) {
        down(fillCount);
            down(mutex);
                item = removeItemFromBuffer();
            up(mutex);
        up(emptyCount);
        consumeItem(item);
    }
}

ترتیبی که سمافورهای مختلف افزایش یا کاهش یافته‌اند بسیار مهم است. اگر ترتیب افزایش یا کاهش سمافورها تغییر کند، وارد بن‌بست خواهیم شد.

مثال‌ها[ویرایش]

مثال از سی++[ویرایش]

 -pthread -lpthread -o pc
*/
#include <iostream>
#include <sstream>
#include <vector>
#include <stack>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <chrono>
using namespace std;
 
// print function for "thread safe" printing using a stringstream
void print(ostream& s) { cout <<s.rdbuf(); cout.flush(); s.clear(); }
 
//      Constants
//
const int num_producers = 5;
const int num_consumers = 10;
const int producer_delay_to_produce = 10;   // in milliseconds
const int consumer_delay_to_consume = 30;   // in milliseconds
 
const int consumer_max_wait_time = 200;     // in milliseconds - max time that a 
                                            // consumer can wait for a product to be produced.
 
const int max_production = 10;              // When producers has produced this quantity 
                                            // they will stop to produce
const int max_products = 10;                // Maximum number of products that can be stored
 
//
//      Variables
//
atomic<int> num_producers_working(0);       // When there's no producer working the consumers 
                                            // will stop, and the program will stop.
stack<int> products;                        // The products stack, here we will store our products
mutex xmutex;                               // Our mutex, without this mutex our program will cry
 
condition_variable is_not_full;             // to indicate that our stack is not full between 
                                            // the thread operations
condition_variable is_not_empty;            // to indicate that our stack is not empty between 
                                            // the thread operations
 
//
//      Functions
//
 
//      Produce function, producer_id will produce a product
void produce(int producer_id)
{
        unique_lock<mutex> lock(xmutex);
        int product;
 
        is_not_full.wait(lock, [] { return products.size() != max_products; });
        product = products.size();
        products.push(product);
 
        print(stringstream() <<"Producer " <<producer_id <<" produced " <<product <<"\n");
        is_not_empty.notify_all();
}
 
//      Consume function, consumer_id will consume a product
void consume(int consumer_id)
{
        unique_lock<mutex> lock(xmutex);
        int product;
 
        if(is_not_empty.wait_for(lock, chrono::milliseconds(consumer_max_wait_time),
                [] { return products.size()> 0; }))
        {
                product = products.top();
                products.pop();
 
                print(stringstream() <<"Consumer " <<consumer_id <<" consumed " <<product <<"\n");
                is_not_full.notify_all();
        }
}
 
//      Producer function, this is the body of a producer thread
void producer(int id)
{
        ++num_producers_working;
        for(int i = 0; i <max_production; ++i)
        {
                produce(id);
                this_thread::sleep_for(chrono::milliseconds(producer_delay_to_produce));
        }
 
        print(stringstream() <<"Producer " <<id <<" has exited\n");
        --num_producers_working;
}
 
//      Consumer function, this is the body of a consumer thread
void consumer(int id)
{
        // Wait until there is any producer working
        while(num_producers_working == 0) this_thread::yield();
 
        while(num_producers_working != 0 || products.size()> 0)
        {
                consume(id);
                this_thread::sleep_for(chrono::milliseconds(consumer_delay_to_consume));
        }
 
        print(stringstream() <<"Consumer " <<id <<" has exited\n");
}
 
//
//      Main
//
int main()
{
        vector<thread> producers_and_consumers;
 
        // Create producers
        for(int i = 0; i <num_producers; ++i)
                producers_and_consumers.push_back(thread(producer, i));
 
        // Create consumers
        for(int i = 0; i <num_consumers; ++i)
                producers_and_consumers.push_back(thread(consumer, i));
 
        // Wait for consumers and producers to finish
        for(auto& t : producers_and_consumers)
                t.join();
}

مثال از جاوا[ویرایش]

import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * 1 producer and 3 consumers producing/consuming 10 items
 */
public class ProducerConsumer {
 
        Stack<Integer> items = new Stack<Integer>();
        final static int NO_ITEMS = 10;
 
        public static void main(String args[]) {
                ProducerConsumer pc = new ProducerConsumer();
                Thread t1 = new Thread(pc.new Producer());
                Consumer consumer  = pc.new Consumer();
                Thread t2 = new Thread(consumer);
                Thread t3 = new Thread(consumer);
                Thread t4 = new Thread(consumer);
                t1.start();
                try {
                        Thread.sleep(100);
                } catch (InterruptedException e1) {
                        e1.printStackTrace();
                }
                t2.start();
                t3.start();
                t4.start();
                try {
                        t2.join();
                        t3.join();
                        t4.join();
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
        }
 
        class Producer implements Runnable {
 
                public void produce(int i) {
                        System.out.println("Producing " + i);
                        items.push(new Integer(i));
                }
 
                public void run() {
                        int i = 0;
                        // produce 10 items
                        while (i++ <NO_ITEMS) {
                                synchronized (items) {
                                        produce(i);
                                        items.notifyAll();
                                }
                                try {
                                        // sleep for some time, 
                                        Thread.sleep(10);
                                } catch (InterruptedException e) {
                                }
                        }
                }
        }
 
        class Consumer implements Runnable {
                //consumed counter to allow the thread to stop
                AtomicInteger consumed = new AtomicInteger();
 
                public void consume() {
                        if (!items.isEmpty()) {
                                System.out.println("Consuming " + items.pop());
                                consumed.incrementAndGet();
                        }
                }
 
                private boolean theEnd() {
                        return consumed.get()>= NO_ITEMS;
                }
 
                public void run() {
                        while (!theEnd()) {
                                synchronized (items) {
                                        while (items.isEmpty() && (!theEnd())) {
                                                try {
                                                        items.wait(10);
                                                } catch (InterruptedException e) {
                                                        Thread.interrupted();
                                                }
                                        }
                                        consume();
 
                                }
                        }
                }
        }
}

منابع[ویرایش]

  • مشارکت‌کنندگان ویکی‌پدیا، «Producer–consumer problem»، ویکی‌پدیای انگلیسی، دانشنامهٔ آزاد (بازیابی در ۳۱ مرداد ۱۳۹۲).