|
||||
|
Глава 13. Потоки в Ruby
Потоки еще иногда называют облегченными процессами. Это просто способ обеспечить параллельное выполнение без накладных расходов, связанных с контекстным переключением между процессами. (Впрочем, общего согласия по поводу того, что такое поток, нет, поэтому мы не будем углубляться в данный вопрос.) В Ruby потоки определены на пользовательском уровне и не зависят от операционной системы. Они работают в DOS так же, как и в UNIX. Но, конечно, это снижает производительность, а на сколько именно, зависит от операционной системы. Потоки полезны, например, тогда, когда некоторые части программы могут работать независимо друг от друга. Применяются они и в тех случаях, когда приложение тратит много времени на ожидание события. Часто, пока один поток ждет, другой может выполнять полезную работу. С другой стороны, у потоков есть и недостатки. Всегда надо взвешивать, оправданно ли их применение в конкретном случае. К тому же, иногда доступ к ресурсу принципиально должен осуществляться строго последовательно, поэтому потоки не дадут никакого выигрыша. И, наконец, бывает так, что накладные расходы на синхронизацию доступа к глобальным ресурсам превышают экономию, достигаемую за счет использования нескольких потоков. По этой и ряду других причин некоторые авторитеты вообще рекомендуют держаться подальше от многопоточного программирования. Действительно, такие программы сложны и подвержены ошибкам, которые трудно отлаживать. Но мы оставим читателю самому решать, когда стоит применять эту технику. Проблемы, связанные с несинхронизированными потоками, хорошо известны. При одновременном доступе к глобальным данным со стороны нескольких потоков данные могут быть запорчены. Если один поток делает какое-то допущение о том, что успел выполнить другой поток, возможна гонка (race condition); обычно это приводит к «недетерминированному» коду, который дает разные результаты при каждом запуске. Наконец, существует опасность тупиковой ситуации, когда ни один поток не может продолжить выполнение, поскольку ожидает ресурс, занятый другим потоком. Код, написанный так, что ни одна из этих проблем не возникает, называется безопасным относительно потоков. Не все в Ruby безопасно относительно потоков, но имеются методы синхронизации, которые позволяют контролировать доступ к переменным и ресурсам, защищать критические секции программы и избегать тупиковых ситуаций. Мы рассмотрим их в этой главе и проиллюстрируем на примерах. 13.1. Создание потоков и манипулирование имиК числу основных операций над потоками относятся создание потока, передача ему входной информации и получение результатов, останов потока и т.д. Можно получить список запущенных потоков, опросить состояние потока и выполнить ряд других проверок. Ниже представлен обзор основных операций. 13.1.1. Создание потоковСоздать поток просто: достаточно вызвать метод new и присоединить блок, который будет исполняться в потоке. thread = Thread.new do # Предложения, исполняемые в потоке... end Возвращаемое значение — объект типа Thread. Главный поток программы может использовать его для управления вновь созданным потоком. А если нужно передать потоку параметры? Достаточно передать их методу Thread.new, который, в свою очередь, передаст их блоку. a = 4 b = 5 с = 6 thread2 = Thread.new(а,b,с) do |a, x, y| # Манипуляции с a, x и y. end # Если переменная а будет изменена новым потоком, # то главный поток не получит об этом никакого уведомления. Параметры блока, являющиеся ссылками на существующие переменные, практически неотличимы от самих переменных. Поэтому, например, переменная ав каком-то смысле «опасна», что и отражено в комментарии. Поток может также обращаться к переменным из той же области видимости, в которой был создан. Ясно, что без синхронизации это может стать источником проблем. Главный и любой другой поток могут изменять такую переменную независимо друг от друга, и результаты подобных действий непредсказуемы. x = 1 y = 2 thread3 = Thread.new do # Этот поток может манипулировать переменными x and y # из внешней области видимости, но это не всегда безопасно. sleep(rand(0)) # Спать в течение случайно выбранного времени # (меньше секунды). x = 3 end sleep(rand(0)) puts x # Если запустить эту программу несколько раз подряд, то может быть # напечатано как 1, так и 3! У метода newесть синоним fork— это имя выбрано по аналогии с хорошо известным системным вызовом в UNIX. 13.1.2. Доступ к локальным переменным потокаМы знаем об опасности доступа из потока к переменным, определенным вне его области видимости, но мы также знаем, что у потока могут быть локальные данные. А что делать, если поток хочет «обнародовать» часть принадлежащих ему данных? Для этой цели предусмотрен специальный механизм. Если объект Threadрассматривать как хэш, то к локальным данным потока можно обратиться из любого места в области видимости этого объекта. Мы не хотим сказать, что так можно обратиться к настоящим локальным переменным; это допустимо лишь для доступа к именованным данным, своим для каждого потока. Существует также метод key?, который сообщает, используется ли указанное имя в данном потоке. Внутри потока к таким данным тоже следует обращаться, как к хэшу. Метод Thread.currentпозволяет сделать запись чуть менее громоздкой. thread = Thread.new do t = Thread.current t[:var1] = "Это строка" t[:var2] = 365 end # Доступ к локальным данным потока извне... x = thread[:var1] # "Это строка" y = thread[:var2] # 365 has_var2 = thread.key?("var2") # true has_var3 = thread.key?("var3") # false Отметим, что эти данные доступны другим потокам даже после того, их владелец завершил работу (как в данном случае). Помимо символа (см. выше), для идентификации локальной переменной потока можно употреблять и строки. thread = Thread.new do t = Thread.current t["var3"] = 25 t[:var4] = "foobar" end a = thread[:var3] = 25 b = thread["var4"] = "foobar" He путайте эти специальные имена с настоящими локальными переменными. В следующем фрагменте разница видна более отчетливо: thread = Thread.new do t = Thread.current t["var3"] = 25 t[:var4] = "foobar" var3 = 99 # Настоящие локальные переменные var4 = "zorch" # (извне недоступны) end a = thread[:var3] # 25 b = thread["var4"] # "foobar" И еще отметим, что ссылку на объект (на настоящую локальную переменную) внутри потока можно использовать для сокращенной записи. Это справедливо, если вы сохраняете одну и ту же ссылку, а не создаете новую. thread = Thread.new do t = Thread.current x = "nXxeQPdMdxiBAxh" t[:my_message] = x x.reverse! x.delete! "x" x.gsub!(/[A-Z]/,"") # С другой стороны, присваивание создает новый объект, # поэтому "сокращение" становится бесполезным... end а = thread[:my_message] # "hidden" Ясно, что сокращение не будет работать и в том случае, когда вы имеете дело с объектами наподобие Fixnum, которые хранятся как непосредственные значения, а не ссылки. 13.1.3. Опрос и изменение состояния потокаВ классе Threadесть несколько полезных методов класса. Метод listвозвращает массив «живых» потоков, метод mainвозвращает ссылку на главный поток программы, который породил все остальные, а метод currentпозволяет потоку идентифицировать самого себя. t1 = Thread.new { sleep 100 } t2 = Thread.new do if Thread.current == Thread.main puts "Это главный поток." # HE печатается, end 1.upto(1000) { sleep 0.1 } end count = Thread.list.size # 3 if Thread.list.include ?(Thread.main) puts "Главный поток жив." # Печатается всегда! end if Thread.current == Thread.main puts "Я главный поток." # Здесь печатается... end Методы exit, pass, start, stopи killслужат для управления выполнением потоков (как изнутри, так и извне): # в главном потоке... Thread.kill(t1) # Завершить этот поток. Thread.pass # Передать управление t2. t3 = Thread.new do sleep 20 Thread.exit # Выйти из потока. puts "Так не бывает!" # Никогда не выполняется. end Thread.kill(t2) # Завершить t2. # Выйти из главного потока (все остальные тоже завершаются). Thread.exit Отметим, что не существует метода экземпляра stop, поэтому поток может приостановить собственное выполнение, но не выполнение другого потока. Существуют различные методы для опроса состояния потока. Метод экземпляра alive?сообщает, является ли данный поток «живым» (не завершил выполнение), а метод stop?— находится ли он в состоянии «приостановлен». count = 0 t1 = Thread.new { loop { count += 1 } } t2 = Thread.new { Thread.stop } sleep 1 flags = [t1.alive?, # true t1.stop?, # false t2.alive?, # true t2.stop?] # true Получить состояние потока позволяет метод status. Он возвращает значение "run", если поток выполняется; "sleep"— если он приостановлен, спит или ожидает результата ввода/вывода; false— если поток нормально завершился, и nil— если поток завершился в результате исключения. t1 = Thread.new { loop {} } t2 = Thread.new { sleep 5 } t3 = Thread.new { Thread.stop } t4 = Thread.new { Thread.exit } t5 = Thread.new { raise "exception" } s1 = t1.status # "run" s2 = t2.status # "sleep" s3 = t3.status # "sleep" s4 = t4.status # false s5 = t5.status # nil Глобальную переменную $SAFEможно установить по-разному в разных потоках. Стало быть, она вовсе не является глобальной, но стоит ли жаловаться на это, если она позволяет разным потокам работать с разным уровнем безопасности? Метод safe_levelвозвращает текущий уровень безопасности потока. t1 = Thread.new { $SAFE = 1; sleep 5 } t2 = Thread.new { $SAFE = 3; sleep 5 } sleep 1 lev0 = Thread.main.safe_level # 0 lev1 = t1.safe_level # 1 lev2 = t2.safe_level # 3 Метод доступа priorityпозволяет узнать и изменить приоритет потока: t1 = Thread.new { loop { sleep 1 } } t2 = Thread.new { loop { sleep 1 } } t2.priority = 3 # Установить для потока t2 приоритет 3 p1 = t1.priority # 0 p2 = t2.priority # 3 Поток с большим приоритетом будет чаще получать процессорное время. Специальный метод passпозволяет передать управление планировщику. Иными словами, поток просто уступает свой временной квант, но не приостанавливается и не засыпает. t1 = Thread.new do puts "alpha" Thread.pass puts "beta" end t2 = Thread.new do puts "gamma" puts "delta" end t1.join t2.join В этом искусственном примере вызов Thread.passприводит к печати строк в следующем порядке: alpha gamma delta beta. Без него было бы напечатано alpha beta gamma delta. Конечно, этот механизм следует использовать не для синхронизации, а только для экономного расходования процессорного времени. Выполнение приостановленного потока можно возобновить методами методами runили wakeup: t1 = Thread.new do Thread.stop puts "Здесь есть изумруд." end t2 = Thread.new do Thread.stop puts "Вы находитесь в точке Y2." end sleep 1 t1.wakeup t2.run Между этими методами есть тонкое различие. Метод wakeupизменяет состояние потока, так что он становится готовым к выполнению, но не запускает его немедленно. Метод же runпробуждает поток и сразу же планирует его выполнение. В данном случае t1просыпается раньше t2, но t2планируется первым, что приводит к следующему результату: Вы находитесь в точке Y2. Здесь есть изумруд. Конечно, было бы неосмотрительно реализовывать синхронизацию на основе этого механизма. Метод экземпляра raiseвозбуждает исключение в потоке, от имени которого вызван. (Этот метод необязательно вызывать в том потоке, которому адресовано исключение.) factorial1000 = Thread.new do begin prod = 1 1.upto(1000) {|n| prod *= n } puts "1000! = #{prod}" rescue # Ничего не делать... end end sleep 0.01 # На вашей машине значение может быть иным. if factorial1000.alive? factorial1000.raise("Стоп!") puts "Вычисление было прервано!" else puts "Вычисление успешно завершено." end Поток, запущенный в предыдущем примере, пытался вычислить факториал 1000. Если для этого не хватило одной сотой секунды, то главный поток завершит его. Как следствие, на относительно медленной машине будет напечатано сообщение «Вычисление было прервано!» Что касается части rescueвнутри потока, то в ней мог бы находиться любой код, как, впрочем, и всегда. 13.1.4. Назначение рандеву (и получение возвращенного значения)Иногда главный поток хочет дождаться завершения другого потока. Для этой цели предназначен метод join: t1 = Thread.new { do_something_long() } do_something_brief() t1.join # Ждать завершения t1. Отметим, что вызывать метод joinнеобходимо, если нужно дождаться завершения другого потока. В противном случае главный поток завершится, а вместе с ним и все остальные. Например, следующий код никогда не напечатал бы окончательный ответ, не будь в конце вызова join: meaning_of_life = Thread.new do puts "Смысл жизни заключается в..." sleep 10 puts 42 end sleep 9 meaning_of_life.join Существует полезная идиома, позволяющая вызвать метод joinдля всех «живых» потоков, кроме главного (ни один поток, даже главный, не может вызывать joinдля самого себя). Thread.list.each { |t| t.join if t != Thread.main } Конечно, любой поток, а не только главный, может вызвать joinдля любого другого потока. Если главный поток и какой-то другой попытаются вызвать joinдруг для друга, возникнет тупиковая ситуация. Интерпретатор обнаружит это и завершит программу. thr = Thread.new { sleep 1; Thread.main.join } thr.join # Тупиковая ситуация! С потоком связан блок, который может возвращать значение. Следовательно, и сам поток может возвращать значение. Метод valueнеявно вызывает joinи ждет, пока указанный поток завершится, а потом возвращает значение последнего вычисленного в потоке выражения. max = 10000 thr = Thread.new do sum = 0 1.upto(max) { |i| sum += i } sum end guess = (max*(max+1))/2 print "Формула " if guess == thr.value puts "правильна." else puts "неправильна." end 13.1.5. Обработка исключенийЧто произойдет, если в потоке возникнет исключение? Как выясняется, поведение можно сконфигурировать заранее. Существует флаг abort_on_exception, который работает как на уровне класса, так и на уровне экземпляра. Он реализован в виде метода доступа (то есть позволяет читать и устанавливать атрибут) на обоих уровнях. Если abort_on_exceptionдля некоторого потока равен true, то при возникновении в этом потоке исключения будут завершены и все остальные потоки. Thread.abort_on_exception = true t1 = Thread.new do puts "Привет!" sleep 2 raise "some exception" puts "Пока!" end t2 = Thread.new { sleep 100 } sleep 2 puts "Конец" В этом примере флаг abort_on_exceptionустановлен в trueна уровне системы в целом (отменяя подразумеваемое по умолчанию значение). Следовательно, когда в потоке t1возникает исключение, завершаются и t1, и главный поток. Печатается только слово «Привет!». В следующем примере эффект такой же: t1 = Thread.new do puts "Привет!" sleep 2 raise "some exception" puts "Пока!" end t1.abort_on_exception = true t2 = Thread.new { sleep 100 } sleep 2 puts "Конец" А вот в следующем оставлено принимаемое по умолчанию значение false, и мы наконец-то видим слово «Конец», печатаемое главным потоком (слова «Пока!» мы не увидим никогда, поскольку поток t1при возникновении исключения завершается безусловно). t1 = Thread.new do puts "Привет!" sleep 2 raise "some exception" puts "Пока!" end t2 = Thread.new { sleep 100 } sleep 2 puts "Конец" # Выводится: Привет! Конец 13.1.6. Группы потоковГруппа потоков — это механизм управления логически связанными потоками. По умолчанию все потоки принадлежат группе Default(это константа класса). Но если создать новую группу, то в нее можно будет помещать потоки. В любой момент времени поток может принадлежать только одной группе. Если поток помещается в группу, то он автоматически удаляется из той группы, которой принадлежал ранее. Метод класса ThreadGroup.newсоздает новую группу потоков, а метод экземпляра addпомещает поток в группу. f1 = Thread.new("file1") { |file| waitfor(file) } f2 = Thread.new("file2") { |file| waitfor(file) } file_threads = ThreadGroup.new file_threads.add f1 file_threads.add f2 Метод экземпляра listвозвращает массив всех потоков, принадлежащих данной группе. # Подсчитать все "живые" потоки в группе this_group. count = 0 this_group.list.each {|x| count += 1 if x.alive? } if count < this_group.list.size puts "Некоторые потоки в группе this_group уже скончались." else puts "Все потоки в группе this_group живы." end В класс ThreadGroupможно добавить немало полезных методов. В примере ниже показаны методы для возобновления всех потоков, принадлежащих группе, для группового ожидания потоков (с помощью join) и для группового завершения потоков: class ThreadGroup def wakeup list.each { |t| t.wakeup } end def join list.each { |t| t.join if t != Thread.current } end def kill list.each { |t| t.kill } end end 13.2. Синхронизация потоковПочему необходима синхронизация? Потому что из-за «чередования» операций доступ к переменным и другим сущностям может осуществляться в порядке, который не удается установить путем чтения исходного текста отдельных потоков. Два и более потоков, обращающихся к одной и той же переменной, могут взаимодействовать между собой непредвиденными способами, и отлаживать такую программу очень трудно. Рассмотрим простой пример: x = 0 t1 = Thread.new do 1.upto(1000) do x = x + 1 end end t2 = Thread.new do 1.upto(1000) do x = x + 1 end end t1.join t2.join puts x Сначала переменная xравна 0. Каждый поток увеличивает ее значение на тысячу раз. Логика подсказывает, что в конце должно быть напечатано 2000. Но фактический результат противоречит логике. На конкретной машине было напечатано значение 1044. В чем дело? Мы предполагали, что инкремент целого числа — атомарная (неделимая) операция. Но это не так. Рассмотрим последовательность выполнения приведенной выше программы. Поместим поток t1слева, а поток t2справа. Каждый квант времени занимает одну строчку и предполагается, что к моменту, когда был сделан этот мгновенный снимок, переменная xимела значение 123. t1 t2 -------------------------- ----------------------------- Прочитать значение x (123) Прочитать значение x (123) Увеличить значение на 1 (124) Увеличить значение на 1 (124) Записать результат в x Записать результат в x Ясно, что каждый поток увеличивает на 1 то значение, которое видит. Но не менее ясно и то, что после увеличения на 1 обоими потоками xоказалось равно всего 124. И это лишь самая простая из проблем, возникающих в связи с синхронизацией. Для решения более сложных приходится прилагать серьезные усилия — это предмет изучения специалистами в области теоретической информатики и математики. 13.2.1. Синхронизация с помощью критических секцийПростейший способ синхронизации дают критические секции. Когда поток входит в критическую секцию программы, гарантируется, что никакой другой поток не войдет в нее, пока первый не выйдет. Если акцессору Thread.criticalприсвоить значение true, то выполнение других потоков не будет планироваться. В следующем примере мы переработали код предыдущего, воспользовавшись акцессором criticalдля определения критической области, которая защищает уязвимые участки программы. x = 0 t1 = Thread.new do 1.upto(1000) do Thread.critical = true x = x + 1 Thread.critical = false end end t2 = Thread.new do 1.upto(1000) do Thread.critical = true x = x + 1 Thread.critical = false end end t1.join t2.join puts x Теперь последовательность выполнения изменилась; взгляните, в каком порядке работают потоки t1и t2. (Конечно, вне того участка, где происходит увеличение переменной, потоки могут чередоваться более-менее случайным образом.) t1 t2 ----------------------------- ----------------------------- Прочитать значение x (123) Увеличить значение на 1 (124) Записать результат в x Прочитать значение x (124) Увеличить значение на 1 (125) Записать результат в x Возможны такие комбинации операций с потоками, при которых поток планируется даже тогда, когда какой-то другой поток находится в критической секции. Простейший случай — вновь созданный поток начинает исполнение немедленно вне зависимости от того, занимает какой-то другой поток критическую секцию или нет. Поэтому описанную технику лучше применять только в самых простых ситуациях. 13.2.2. Синхронизация доступа к ресурсам (mutex.rb)В качестве примера рассмотрим задачу индексирования Web-сайтов. Мы извлекаем слова из многочисленных страниц в Сети и сохраняем их в хэше. Ключом является само слово, а значением — строка, идентифицирующая документ и номер строки в этом документе. Постановка задачи и так достаточно груба. Но мы огрубим ее еще больше, введя следующие упрощающие допущения: • будем представлять удаленные документы в виде строк; • ограничимся всего тремя строками (они будут «зашиты» в код); • сетевые задержки будем моделировать «засыпанием» на случайный промежуток времени. Взгляните на программу в листинге 13.1. Она даже не печатает получаемые данные целиком, а выводит лишь счетчик слов (не уникальный). Каждый раз при чтении или обновлении хэша мы вызываем метод hesitate, который приостанавливает поток на случайное время. Тем самым поведение программы становится недетерминированным и приближенным к реальности. Листинг 13.1. Программа индексирования с ошибками (гонка) @list = [] @list[0]="shoes ships\nsealing-wax" @list[1]="cabbages kings" @list[2]="quarks\nships\ncabbages" def hesitate sleep rand(0) end @hash = {} def process_list(listnum) lnum = 0 @list[listnum].each do |line| words = line.chomp.split words.each do |w| hesitate if @hash[w] hesitate @hash[w] += ["#{listnum}:#{lnum}"] else hesitate @hash[w] = ["#{listnum}:#{lnum}"] end end lnum += 1 end end t1 = Thread.new(0) {|num| process_list(num) } t2 = Thread.new(1) {|num| process_list(num) } t3 = Thread.new(2) {|num| process_list(num) } t1.join t2.join t3.join count = 0 @hash.values.each {|v| count += v.size } puts "Всего слов: #{count} " # Может быть напечатано 7 или 8! Здесь имеется проблема. Если ваша система ведет себя примерно так же, как наша, то программа может напечатать одно из двух значений! В наших тестах с одинаковой вероятностью печаталось 7 или 8. Если слов и списков больше, то и разброс окажется более широким. Попробуем исправить положение с помощью мьютекса, который будет контролировать доступ к разделяемому ресурсу. (Слово «mutex» — это сокращение от mutual exclusion, «взаимная блокировка».) Обратимся к листингу 13.2. Библиотека Mutexпозволяет создавать мьютексы и манипулировать ими. Мы можем захватить (lock) мьютекс перед доступом к хэшу и освободить (unlock) его по завершении операции. Листинг 13.2. Программа индексирования с мьютексом require 'thread.rb' @list = [] @list[0]="shoes ships\nsealing-wax" @list[1]="cabbages kings" @list[2]="quarks\nships\ncabbages" def hesitate sleep rand(0) end @hash = {} @mutex = Mutex.new def process_list(listnum) lnum = 0 @list[listnum].each do |line| words = line.chomp.split words.each do |w| hesitate @mutex.lock if @hash[w] hesitate @hash[w] += ["#{listnum}:#{lnum}"] else hesitate @hash[w] = ["#{listnum}:#{lnum}"] end @mutex.unlock end lnum += 1 end end t1 = Thread.new(0) {|num| process_list(num) } t2 = Thread.new(1) {|num| process_list(num) } t3 = Thread.new(2) {|num| process_list(num) } t1.join t2.join t3.join count = 0 @hash.values.each {|v| count += v.size } puts "Всего слов: #{count} " # Всегда печатается 8! Отметим, что помимо метода lockв классе Mutexесть также метод try_lock. Он отличается от lockтем, что если мьютекс уже захвачен другим потоком, то он не дожидается освобождения, а сразу возвращает false. require 'thread' mutex = Mutex.new t1 = Thread.new { mutex.lock; sleep 30 } sleep 1 t2 = Thread.new do if mutex.try_lock puts "Захватил" else puts "He сумел захватить" # Печатается немедленно. end end sleep 2 Эта возможность полезна, если поток не хочет приостанавливать выполнение. Есть также метод synchronize, который захватывает мьютекс, а потом автоматически освобождает его. mutex = Mutex.new mutex.synchronize do # Любой код, нуждающийся в защите... end Существует еще библиотека mutex_m, где определен модуль Mutex_m, который можно подмешивать к классу (или использовать для расширения объекта). У такого расширенного объекта будут все методы мьютекса, так что он сам может выступать в роли мьютекса. require 'mutex_m' class MyClass include Mutex_m # Теперь любой объект класса MyClass может вызывать # методы lock, unlock, synchronize... # Внешние объекты также могут вызывать эти # методы для объекта MyClass. end 13.2.3. Предопределенные классы синхронизированных очередейВ библиотеке thread.rbесть пара классов, которые иногда бывают полезны. Класс Queueреализует безопасную относительно потоков очередь, доступ к обоим концам которой синхронизирован. Это означает, что разные потоки могут, ничего не опасаясь, работать с такой очередью. Класс SizedQueueотличается от предыдущего тем, что позволяет ограничить размер очереди (число элементов в ней). Оба класса имеют практически один и тот же набор методов, поскольку SizedQueueнаследует Queue. Правда, в подклассе определен еще акцессор max, позволяющий получить и установить максимальный размер очереди. buff = SizedQueue.new(25) upper1 = buff.max #25 # Увеличить размер очереди... buff.max = 50 upper2 = buff.max # 50 В листинге 13.3 приведено решение задачи о производителе и потребителе. Для производителя задержка (аргумент sleep) чуть больше, чем для потребителя, чтобы единицы продукции «накапливались». Листинг 13.3. Задача о производителе и потребителеrequire 'thread' buffer = SizedQueue.new(2) producer = Thread.new do item = 0 loop do sleep rand 0 puts "Производитель произвел #{item}" buffer.enq item item += 1 end end consumer = Thread.new do loop do sleep (rand 0)+0.9 item = buffer.deq puts "Потребитель потребил #{item}" puts " ожидает = #{buffer.num_waiting}" end end sleep 60 # Работать одну минуту, потом завершить оба потока. Чтобы поместить элемент в очередь и извлечь из нее, рекомендуется применять соответственно методы enqи deq. Можно было бы для помещения в очередь пользоваться также методом push, а для извлечения — методами popи shift, но их названия не так мнемоничны в применении к очередям. Метод empty?проверяет, пуста ли очередь, а метод clearопустошает ее. Метод size(и его синоним length) возвращает число элементов в очереди. # Предполагается, что другие потоки не мешают... buff = Queue.new buff.enq "one" buff.enq "two" buff.enq "three" n1 = buff.size # 3 flag1 = buff.empty? # false buff.clear n2 = buff.size # 0 flag2 = buff.empty? # true Метод num_waitingвозвращает число потоков, ожидающих доступа к очереди. Если размер очереди не ограничен, то это потоки, ожидающие возможности удалить элементы; для ограниченной очереди включаются также потоки, пытающиеся добавить элементы. Необязательный параметр non_blockметода deqв классе Queueпо умолчанию равен false. Если же он равен true, по при попытке извлечь элемент из пустой очереди он не блокирует поток, а возбуждает исключение ThreadError. 13.2.4. Условные переменные
Условная переменная — это, по существу, очередь потоков. Они используются в сочетании с мьютексами для лучшего управления синхронизацией потоков. Условная переменная всегда ассоциируется с каким-то мьютексом. Ее назначение — освободить мьютекс до тех пор, пока не начнет выполняться определенное условие. Представьте себе ситуацию, когда поток захватил мьютекс, но не готов продолжать выполнение. Тогда он может заснуть под контролем условной переменной, ожидая, что будет разбужен, когда условие станет истинным. Важно понимать, что пока поток ждет условную переменную, мьютекс свободен, поэтому другие потоки могут получить доступ к защищенному им ресурсу. А как только другой поток сигнализирует этой переменной, ожидающий поток пробуждается и пытается вновь захватить мьютекс. Рассмотрим несколько искусственный пример в духе задачи об обедающих философах. Представьте себе, что вокруг стола сидят три скрипача, ожидающих своей очереди поиграть. Но у них есть всего две скрипки и один смычок. Понятно, что скрипач сможет играть, только если одновременно завладеет одной из скрипок и смычком. Мы поддерживаем счетчики свободных скрипок и смычков. Когда скрипач хочет получить скрипку и смычок, он должен ждать их освобождения. В программе ниже мы защитили проверку условия мьютексом и под его защитой ждем скрипку и смычок порознь. Если скрипка или смычок заняты, поток засыпает. Он не владеет мьютексом до тех пор, пока другой поток не просигнализирует о том, что ресурс свободен. В этот момент первый поток просыпается и снова захватывает мьютекс. Код представлен в листинге 13.4. Листинг 13.4. Три скрипачаrequire 'thread' @music = Mutex.new @violin = ConditionVariable.new @bow = ConditionVariable.new @violins_free = 2 @bows_free = 1 def musician(n) loop do sleep rand(0) @music.synchronize do @violin.wait(@music) while @violins_frее == 0 @violins_free -= 1 puts "#{n} владеет скрипкой" puts "скрипок #@violins_frее, смычков #@bows_free" @bow.wait(@music) while @bows_free == 0 @bows_free -= 1 puts "#{n} владеет смычком" puts "скрипок #@violins_free, смычков #@bows_free" end sleep rand(0) puts "#{n}: (...играет...)" sleep rand(0) puts "#{n}: Я закончил." @music.synchronize do @violins_free += 1 @violin.signal if @violins_free == 1 @bows_free += 1 @bow.signal if @bows_free == 1 end end end threads = [] 3.times {|i| threads << Thread.new { musician(i) } } threads.each {|t| t.join } Мы полагаем, что это решение никогда не приводит к тупиковой ситуации, хотя доказать этого не сумели. Но интересно отметить, что описанный алгоритм не справедливый. В наших тестах оказалось, что первый скрипач играет чаще двух остальных, а второй чаще третьего. Выяснение причин такого поведения и его исправление мы оставляем читателю в качестве интересного упражнения. 13.2.5. Другие способы синхронизацииЕще один механизм синхронизации - это монитор, который в Ruby реализован в библиотеке monitor.rb. Это более развитый по сравнению с мьютексом механизм, основное отличие состоит в том, что захваты одного и того же мьютекса не могут быть вложенными, а монитора — могут. Тривиальный случай возникновения такой ситуации вряд ли возможен. В самом деле, кто станет писать такой код: @mutex = Mutex.new @mutex.synchronize do @mutex.synchronize do #... end end Но нечто подобное может произойти в сложной программе (или при рекурсивном вызове метода). Какова бы ни была причина, последствием будет тупиковая ситуация. Уход от нее — одно из достоинств модуля-примеси Monitor. @mutex = Mutex.new def some_method @mutex.synchronize do #... some_other_method # Тупиковая ситуация! end end def some_other_method @mutex.synchronize do #... end end Модуль-примесь Monitorобычно применяется для расширения объекта. Для создания условной переменной предназначен метод new_cond. Класс ConditionVariableв библиотеке monitor.rbдополнен по сравнению с определением в библиотеке thread. У него есть методы wait_untilи wait_while, которые блокируют поток в ожидании выполнения условия. Кроме того, возможен тайм-аут при ожидании, поскольку у метода waitимеется параметр timeout, равный количеству секунд (по умолчанию nil). Поскольку примеры работы с потоками у нас кончаются, то в листинге 13.5 мы предлагаем реализацию классов Queueи SizedQueueс помощью монитора. Код приводится с разрешения автора, Шуго Маэда (Shugo Maeda). Листинг 13.5. Реализация класса Queue с помощью монитора # Автор: Shugo Maeda require 'monitor' class Queue def initialize @que = [] @monitor = Monitor.new @empty_cond = @monitor.new_cond end def enq(obj) @monitor.synchronize do @que.push(obj) @empty_cond.signal end end def deq @monitor.synchronize do while @que.empty? @empty_cond.wait end return @que.shift end end end class SizedQueue < Queue attr :max def initialize(max) super() @max = max @full_cond = @monitor.new_cond end def enq(obj) @monitor.synchronize do while @que.length >= @max @full_cond.wait end super(obj) end end def deq @monitor.synchronize do obj = super if @que.length < @max @full_cond.signal end return obj end end def max=(max) @monitor.synchronize do @max = max @full_cond.broadcast end end end Еще один вариант синхронизации (двузначную блокировку со счетчиком) предлагает библиотека sync.rb. В ней определен модуль Sync_m, который можно применять вместе с ключевыми словами includeи extend(как и Mutex_m). Этот модуль содержит методы locked?, shared?, exclusive?, lock, unlockи try_lock. 13.2.6. Тайм-аут при выполнении операцийЧасто встречается ситуация, когда на выполнение операции отводится определенное максимальное время. Это позволяет избежать бесконечных циклов и более строго контролировать порядок работы. Подобная возможность очень полезна, в частности, в сетевых приложениях, где ответ от сервера может и не прийти. Библиотека timeout.rbпредлагает решение этой проблемы на основе потоков (см. листинг 13.6). С методом timeoutассоциирован выполняемый блок. Если истечет заданное число секунд, метод возбуждает исключение TimeoutError, которое можно перехватить с помощью rescue. Листинг 13.6. Пример тайм-аута require 'timeout.rb' flag = false answer = nil begin timeout(5) do puts "Хочу печенье!" answer = gets.chomp flag = true end rescue TimeoutError flag = false end if flag if answer == "cookie" puts "Спасибо! Хрум, хрум..." else puts "Это же не печенье!" exit end else puts "Эй, слишком медленно!" exit end puts "До встречи..." 13.2.7. Ожидание событияЧасто один или несколько потоков следят за «внешним миром», а остальные выполняют полезную работу. Все примеры в этом разделе надуманные, но общий принцип они все же иллюстрируют. В следующем примере прикладную задачу решают три потока. Четвертый поток каждые пять секунд просыпается, проверяет глобальную переменную $flagи, когда видит, что флаг поднят, пробуждает еще два потока. Это освобождает три рабочих потока от необходимости напрямую общаться с двумя другими и, возможно, от многочисленных попыток разбудить их. $flag = false work1 = Thread.new { job1() } work2 = Thread.new { job2() } work3 = Thread.new { job3() } thread4 = Thread.new { Thread.stop; job4() } thread5 = Thread.new { Thread.stop; job5() } watcher = Thread.new do loop do sleep 5 if $flag thread4.wakeup thread5.wakeup Thread.exit end end end Если в какой-то момент выполнения метода job, переменная $flagстанет равной true, то в течение пяти секунд после этого потоки thread4и thread5гарантированно запустятся. После этого поток watcherзавершается. В следующем примере мы ждем создания файла. Каждые 30 секунд проверяется его существование, и как только файл появится, мы запускаем новый поток. Тем временем остальные потоки занимаются своим делом. На самом деле ниже мы наблюдаем за тремя разными файлами. def waitfor(filename) loop do if File.exist? filename file_processor = Thread.new { process_file(filename) } Thread.exit else sleep 30 end end end waiter1 = Thread.new { waitfor("Godot") } sleep 10 waiter2 = Thread.new { waitfor("Guffman") } sleep 10 headwaiter = Thread.new { waitfor("head") } # Основной поток занимается другими делами... Есть много ситуаций, когда поток должен ожидать внешнего события (например, в сетевых приложениях так бывает, когда сервер на другом конце соединения работает медленно или ненадежно). 13.2.8. Продолжение обработки во время ввода/выводаЧасто приложению приходится выполнять одну или более длительных операций ввода/вывода. Прежде всего, речь идет о вводе данных с клавиатуры, поскольку человек печатает куда медленнее, чем вращается диск. Это время можно употребить на пользу с помощью потоков. Возьмем, к примеру, шахматную программу, которая должна ждать, пока человек сделает ход. Конечно, мы можем изложить только сам принцип, не вдаваясь в технические детали. Предположим, что итератор predict_moveгенерирует вероятные ходы человека (и ответные ходы программы). Тогда в момент, когда человек сделает ход, не исключено, что у компьютера уже будет готов ответ. scenario = {} # Хэш ход-ответ. humans_turn = true thinking_ahead = Thread.new(board) do predict_move do |m| scenario[m] = my_response(board,m) Thread.exit if humans_turn == false end end human_move = get_human_move(board) humans_turn = false # Остановить поток. # Теперь можно посмотреть, нет ли в хэше scenario хода, # сделанного пользователем... Конечно, настоящие шахматные программы работают не так. 13.2.9. Реализация параллельных итераторовПредположим, что нужно параллельно обходить несколько объектов, то есть для каждого объекта найти первый элемент, потом второй, потом третий и т.д. Рассмотрим следующий пример. Пусть compose— имя магического метода, который выполняет композицию итераторов. Допустим еще, что у каждого объекта есть стандартный итератор eachи что каждый объект возвращает по одному элементу на каждой итерации. arr1 = [1, 2, 3, 4] arr2 = [5, 10, 15, 20] compose(arr1, arr2) {|a,b| puts "#{а} и #{b}" } # Должно быть напечатано: # 1 и 5 # 2 и 10 # 3 и 15 # 4 и 20 Можно было бы, конечно, использовать для этой цели zip. Но если нужно более элегантное решение, при котором все элементы не будут храниться одновременно, то без потоков не обойтись. Такое решение представлено в листинге 13.7. Листинг 13.7. Параллельные итераторы def compose(*objects) threads = [] for obj in objects do threads << Thread.new(obj) do |myobj| me = Thread.current me[:queue] = [] myobj.each {|x| me[:queue].push(x) } end end list = [0] # Фиктивное значение, отличное от nil. while list.nitems > 0 do # Еще есть не nil. list = [] for thr in threads list << thr[:queue].shift # Удалить по одному из каждого. end yield list if list.nitems > 0 # He вызывать yield, если все равны nil. end end x = [1, 2, 3, 4, 5, 6, 7, 8] y = " первый\n второй\n третий\n четвертый\n пятый\n" z = %w[a b с d e f] compose(x, у, z) {|a,b,c| p [a, b, c] } # Выводится: # [1, " первый\n", "a"] # [2, " второй\n", "b"] # [3, " третий\n", "c"] # [4, " четвертый\n", "d"] # [5, " пятый\n", "e"] # [6, nil, "f"] # [7, nil, nil] # [8, nil, nil] Обратите внимание: мы не предполагаем, что все объекты имеют одно и то же число элементов. Если один итератор доходит до конца раньше остальных, то он будет генерировать значения nilдо тех пор, пока не закончит работу «самый длинный» итератор. Конечно, можно написать и более общий метод, который на каждой итерации будет обрабатывать более одного элемента. (В конце концов, не все итераторы возвращают по одному значению за раз.) Можно было бы в первом параметре передавать число значений для каждого итератора. Можно также пользоваться произвольными итераторами (а не только стандартным each). Их имена можно было бы передавать в виде строк, а вызывать с помощью метода send. Много чего еще можно придумать. Впрочем, мы полагаем, что приведенного кода достаточно для большинства целей. Вариации на эту тему оставляем читателю в качестве упражнения. 13.2.10. Параллельное рекурсивное удалениеЗабавы ради напишем код, который будет удалять дерево каталогов. Процедура рекурсивного удаления использует потоки. Как только обнаруживается очередной подкаталог, мы запускаем новый поток, который будет обходить его и удалять содержимое. Созданные в ходе работы программы потоки хранятся в массиве threads. Поскольку это локальная переменная, у каждого потока будет собственная копия массива. Раз к ней может обращаться всего один поток, синхронизировать доступ не надо. Отметим также, что в блок потока передается полное имя файла fullname, чтобы не нужно было беспокоиться по поводу того, что поток обращается к переменной, которую кто-то еще изменяет. Поток делает для себя локальную копию fnэтой переменной. Прежде чем удалять очередной каталог, мы должны дождаться завершения всех созданных в процессе его обхода потоков. def delete_all(dir) threads = [] Dir.foreach(dir) do |e| next if [".",".."].include? e # Пропустить . и .. fullname = dir + "/" + e if FileTest.directory?(fullname) threads << Thread.new(fullname) {|fn| delete_all(fn) } else File.delete(fullname) end end threads.each { |t| t.join } Dir.delete(dir) end delete_all("/tmp/stuff") Будет ли работать такая программа быстрее, чем ее вариант без потоков? В наших тестах получалось по-разному. Возможно, это зависит от операционной системы и структуры конкретного каталога — глубины, количества файлов и т.д. 13.3. ЗаключениеКак было сказано, в Ruby не используются платформенные потоки. Программа не станет работать быстрее при наличии нескольких процессоров, но некоторого распараллеливания работы достичь все же можно. Потоки полезны во многих случаях» но писать и отлаживать многопоточную программу довольно трудно, особенно если для получения правильного результата приходится применять изощренные способы синхронизации. Для синхронизации Ruby предоставляет такие классы, как Mutex, Monitorи ConditionVariable. Имеются также безопасные относительно потоков классы очередей Queueи SizedQueue. В главе 14 мы перейдем от обсуждения техники программирования к решению конкретных задач, а именно сценариев системного администрирования. Примечания:1 Огромное спасибо (яп.) 15 Пер. М.Кузмина. — Прим.ред. 16 Пер. С. Маршака. — Прим. ред. |
|
||
Главная | В избранное | Наш E-MAIL | Добавить материал | Нашёл ошибку | Наверх |
||||
|