Public void Num_Read()

{         Nc_read++; if (Nc_read>= Nc) Nc_read = 0;

}

publicstringMyDequeue() // первый обратившийся к ресурсу поток, прочитает    

{ if (Nc_read == 0)                                                    //сообщение, удалит его из ресурса, сохранит в

{ if (this.Count>0) last_res = this.Dequeue(); //статическойпеременнойlast_rec. Все остальные

Console.WriteLine ("MyDequeue: " + last_res);  //потоки будут обращаться уже к переменнойlast_rec.

}                                            

}

};

ClassSupNConsM

{ objectlock_obj = new object();                        // объектсинхронизации

publicintNsup = 0;                                                     // числопоставщиков

publicintNcons = 0;                                                   // числопотребителей

publicintNmsg = 0;                                                    // число всех сообщений

MyQueueQchange = new MyQueue();               // объекточереди

publicstring[] res_str;                                              // массив результирующих строк (результирующая строка –

// список сообщений, полученных потребителем)

int N1_sup = 5;                                                           // число сообщений для каждого поставщика

 

voidCons(objectn)                                                        // методпотребителя, аргумент - № потребителя

{ intncons_n = Convert.ToInt32(n);                   // преобразование аргумента к нужному типу

stringres = "Потребитель " + ncons_n;          // строка с результатами работы потока

stringres0 = res;

stringstr = "";                                                              // в str помещается текст, читаемый потоком

stringstr_prev = "";                                                  // в str_prevсохраняется текст, прочитанный ранее

intnread = 0;                                                               // число прочитанных потоком сообщений

bool t;

intT = 0;                                                                         // Общее время ожидания в потоке

while (nread<Nmsg)                                               //Цикл чтения сообщений

{         t = false;

           lock (lock_obj)

           { if (Qchange.Count>= 0) { str = Qchange.MyDequeue();                       // очередноесообщение

           t = (str!= str_prev);                               // если сообщение новое для потока,                  if (t) Qchange.Num_Read();// увеличивается счетчик потребителей,

                          }                                                                        // прочитавших данное сообщение

}

           if (t == false)   { T += 50;                                                   // если сообщение не новое, то

Thread.Sleep(50);                                     // задержка, после чего опять выполняется 

if (T> 5000) break; // попытка чтения

}

           else{str_prev = str;                                                                  // новое сообщение запоминается как nread++;                                                                                 // предыдущее, счетчик сообщений в потоке    

res += "\n" + nread.ToString() + ". " + str;                     // увеличивается, формируется строка

Console.WriteLine("{0}: {1} {2} {3}", res0, str, t, nread);       // выходного сообщения

T = 0;  

}                                                                                                                       // конец цикла чтения сообщений             

res_str[ncons_n] = res;

System.Windows.Forms.MessageBox.Show(res_str[ncons_n]);       // выводрез-овработыпотока

}

void Sup(object num)                                                                 //методпоставщика, аргумент - № поставщика

{string n = Convert.ToString(num);

Random r = new Random();

intqn; 

for (inti = 0; i<N1_sup; i++)

{lock (lock_obj) { Qchange.Enqueue("сообщение № " + (i + 1).ToString() + " от " + n);

                          //qn = Qchange.Count;

           }

//Console.WriteLine("Qchange.Enqueue {0}", qn);

int del = Convert.ToInt32(r.NextDouble()) * 3 + 1;

Thread.Sleep(del);

}

}

Public void Test()

{this.N1_sup = 5;

this.Nmsg = 3 * N1_sup;

this.Qchange.Nc = 5;

res_str = new string[6];

Thread ts1 = new Thread(Sup);

Thread ts2 = new Thread(Sup);

Thread ts3 = new Thread(Sup);

Thread tc1 = new Thread(Cons);

Thread tc2 = new Thread(Cons);

Thread tc3 = new Thread(Cons);

Thread tc4 = new Thread(Cons);

Thread tc5 = new Thread(Cons);

           ts2.Start(2);

           ts1.Start(1);

               

tc2.Start(2);

           tc1.Start(1);

           tc3.Start(3);

           tc4.Start(4);

           tc5.Start(5);

 

           ts3.Start(3);

               

tc1.Join();      tc2.Join(); tc3.Join(); tc4.Join(); tc5.Join();

                          }

} // конец класса

}//конец пр-ва имен

Пулпотоков            http://www.intuit.ru/studies/courses/5938/1074/lecture/16447?page=4

Пул (pool – лужа, бильярд, объединение) потоков п редназначен для упрощения многопоточной обработки. Программист выделяет фрагменты кода (рабочие элементы), которые можно выполнять параллельно.Преимуществами использования пула потоков считают:

Планировщик (среда выполнения) оптимальным образом распределяет рабочие элементы по рабочим потокам пула. Т.е. вопросы эффективной загрузки, оптимального числа потоков решаются не программистом, а планировщиком (исполняющей средой).

При применении пула уменьшаются накладные расходы, связанные с ручным созданием и завершением потоков.

Пул потоков используется для обработки задач типа Task. Объект Taskобладают рядом полезных встроенных механизмов (ожидание наступления события, продолжениеработы и т.д.). Поэтому для распараллеливания рабочих элементов рекомендуется использовать именно задачи или шаблоны класса Parallel. Сам пул потоков представлен классом ThreadPool.

Для добавления рабочего элемента в пул используется метод QueueUserWorkItem:

public static bool QueueUserWorkItem

                 (   WaitCallbackcallBack

)

public static bool QueueUserWorkItem

                (    WaitCallbackcallBack,

           objectdata

)

Данный метод помещает код (метод, рабочий элемент) в очередь на выполнение. Код(метод) выполняется, когда становится доступен поток из пула потоков.

Параметры

Callback          тип - System.Threading.WaitCallback,  делегатWaitCallback, представляющий метод,

                          который требуется выполнить.

data                 тип -System.Object, объект, содержащий данные, используемые методом.

Возвращаемое значение true, если метод успешно помещен в очередь, иначе false. Если рабочий элемент не может быть помещен в очередь, создается исключение NotSupportedException.

 

Делегат System.Threading.WaitCallback в пространстве System.Treading объявлен как
[ ComVisibleAttribute (true) ]

public delegate void WaitCallback (object data)

 

Аргумент есть объект, используемый как аргумент метода, который исполняется делегатом.

Делегат WaitCallback может указывать на любой метод, имеющий один параметр System.Object и не возвращающий ничего.

 

Замечание.COM – это стандарт для взаимодействия модулей на разных языка х. Сборки C# также поддерживают этот стандарт по желанию (разработчик может включить их в свойствах сборки), так что.NET-объекты могут быть использованы как COM-объекты, и таким образом — различными нативными приложениями, а также скриптовыми языками (например, javascript).

Атрибут [ComVisible(true)] означает, что интерфейс будет виден самой подсистеме COM. Если программа не экспортирует COM-объекты, и не работает со скриптами, то можно эти атрибуты игнорировать (если ваша программа полностью на.NET, COM-атрибуты не важны для нее).

 

В следующем примере в пул будут помещены Nрабочих элементов, каждый элемент будет формировать текстовую строку созначениями переданного параметра и параметрами текущего потока. В примере будут использованы свойства и методы классов Thread и ThreadPool:

Thread.CurrentThread.ManagedThreadId – идентификатортекущегопотока,

Thread.CurrentThread.IsThreadPoolThread – признак исполнения потока в пуле(true) или нет (false).

ThreadPool.GetMaxThreads (outmaxThreads, outoiasThreads) – метод определяет максимально возможное число потоков в пуле и максимально возможное число асинхронных потоков ввода-вывода в пуле;

ThreadPool.GetAvailableThreads (outavThreads, outoiasThreads) – метод определяет доступное число потоков в пуле и доступное число асинхронных потоков ввода-вывода в пуле.

Число потоков, находящихся в данный момент в пуле можно определить как разность

maxThreads - avThreads

 

privateintThreadsInPool (bool arg=false)

{ intmaxThreads = 0; //макс. возм.число потоков в пуле

intoiasThreads= 0;                   //макс. возм.число асинхр.потоков ввода-вывода в пуле

intavThreads = 0;                  //число доступных потоков в пуле

ThreadPool.GetMaxThreads(outmaxThreads, out oiasThreads);

ThreadPool.GetAvailableThreads(outavThreads, out oiasThreads);

if (arg == true)

Console.WriteLine("Мак. число потоков в пуле {0}, число доступ. потоков {1}, " +

       "число ассинх. пот. ввода-вывода {2}", maxThreads, avThreads, oiasThreads);

returnmaxThreads - avThreads;

}

constintN = 10;                                           //числорабочих потоков

string[] res = newstring[N];                                  //результатывыполненияраб. элементов

privatevoidF(objectobj)                                        //функция, исполняемая рабочим элементом

{intind = (int)obj;

res[ind] += string.Format("i: {0}, ThreadID: {1,-3}, IsPoolThread:{2}, ThreadsInPool: {3} ",

ind, Thread.CurrentThread.ManagedThreadId,

Thread.CurrentThread.IsThreadPoolThread, ThreadsInPool());

);

Thread.Sleep(100);              //задержкавтелепотока

}

 


PublicvoidTest1()

{ ThreadsInPool(true);                                           //информация о числе потоков в пуле            

intt0 = Environment.TickCount;                                     // фиксация момента времени

for (inti=0; i<N; i++)

{ res[i] = "";  

ThreadPool.QueueUserWorkItem(F, i);          // запускэлементавпуле

}

ThreadsInPool(true);                                              //информация сразу после потоков 

intn = -1;

while (n!=0) 

{ Thread.Sleep(20);                                           // ожидание завершения потоков в пуле

n = ThreadsInPool();

}

ThreadsInPool(true);                                              //информация после завершения потоков 

 

Console.WriteLine("Времявыполнения = {0}", Environment.TickCount - t0);

for (inti = 0; i< N; i++) Console.WriteLine(res[i]);

}

Результаты работы

Макс. число потоков в пуле 2047, число доступ.потоков 2047, число ассинх. пот. ввода-вывода 1000

Макс. число потоков в пуле 2047, число доступ. потоков 2043, число ассинх. пот. ввода-вывода 1000

Макс. число потоков в пуле 2047, число доступ. потоков 2047, число ассинх. пот. ввода-вывода 1000

Времявыполнения 10 потоков 344

i: 0, ThreadID: 3, IsPoolThread:True, ThreadsInPool: 4

i: 1, ThreadID: 4, IsPoolThread:True, ThreadsInPool: 4

i: 2, ThreadID: 5, IsPoolThread:True, ThreadsInPool: 4

i: 3, ThreadID: 6, IsPoolThread:True, ThreadsInPool: 4

i: 4, ThreadID: 5, IsPoolThread:True, ThreadsInPool: 4

i: 5, ThreadID: 6, IsPoolThread:True, ThreadsInPool: 4

i: 6, ThreadID: 3, IsPoolThread:True, ThreadsInPool: 4

i: 7, ThreadID: 4, IsPoolThread:True, ThreadsInPool: 4

i: 8, ThreadID: 3, IsPoolThread:True, ThreadsInPool: 4

i: 9, ThreadID: 6, IsPoolThread:True, ThreadsInPool: 4


Понравилась статья? Добавь ее в закладку (CTRL+D) и не забудь поделиться с друзьями:  



double arrow
Сейчас читают про: