• 13.1. Создание потоков и манипулирование ими
  • 13.1.1. Создание потоков
  • 13.1.2. Доступ к локальным переменным потока
  • 13.1.3. Опрос и изменение состояния потока
  • 13.1.4. Назначение рандеву (и получение возвращенного значения)
  • 13.1.5. Обработка исключений
  • 13.1.6. Группы потоков
  • 13.2. Синхронизация потоков
  • 13.2.1. Синхронизация с помощью критических секций
  • 13.2.2. Синхронизация доступа к ресурсам (mutex.rb)
  • 13.2.3. Предопределенные классы синхронизированных очередей
  • 13.2.4. Условные переменные
  • 13.2.5. Другие способы синхронизации
  • 13.2.6. Тайм-аут при выполнении операций
  • 13.2.7. Ожидание события
  • 13.2.8. Продолжение обработки во время ввода/вывода
  • 13.2.9. Реализация параллельных итераторов
  • 13.2.10. Параллельное рекурсивное удаление
  • 13.3. Заключение
  • Глава 13. Потоки в Ruby

    Он тянет нить своего красноречия искуснее, чем развивает свои доводы.

    (Шекспир, «Бесплодные усилия любви», акт V, сцена 1[15])

    Потоки еще иногда называют облегченными процессами. Это просто способ обеспечить параллельное выполнение без накладных расходов, связанных с контекстным переключением между процессами. (Впрочем, общего согласия по поводу того, что такое поток, нет, поэтому мы не будем углубляться в данный вопрос.)

    В Ruby потоки определены на пользовательском уровне и не зависят от операционной системы. Они работают в DOS так же, как и в UNIX. Но, конечно, это снижает производительность, а на сколько именно, зависит от операционной системы.

    Потоки полезны, например, тогда, когда некоторые части программы могут работать независимо друг от друга. Применяются они и в тех случаях, когда приложение тратит много времени на ожидание события. Часто, пока один поток ждет, другой может выполнять полезную работу.

    С другой стороны, у потоков есть и недостатки. Всегда надо взвешивать, оправданно ли их применение в конкретном случае. К тому же, иногда доступ к ресурсу принципиально должен осуществляться строго последовательно, поэтому потоки не дадут никакого выигрыша. И, наконец, бывает так, что накладные расходы на синхронизацию доступа к глобальным ресурсам превышают экономию, достигаемую за счет использования нескольких потоков.

    По этой и ряду других причин некоторые авторитеты вообще рекомендуют держаться подальше от многопоточного программирования. Действительно, такие программы сложны и подвержены ошибкам, которые трудно отлаживать. Но мы оставим читателю самому решать, когда стоит применять эту технику.

    Проблемы, связанные с несинхронизированными потоками, хорошо известны. При одновременном доступе к глобальным данным со стороны нескольких потоков данные могут быть запорчены. Если один поток делает какое-то допущение о том, что успел выполнить другой поток, возможна гонка (race condition); обычно это приводит к «недетерминированному» коду, который дает разные результаты при каждом запуске. Наконец, существует опасность тупиковой ситуации, когда ни один поток не может продолжить выполнение, поскольку ожидает ресурс, занятый другим потоком. Код, написанный так, что ни одна из этих проблем не возникает, называется безопасным относительно потоков.

    Не все в Ruby безопасно относительно потоков, но имеются методы синхронизации, которые позволяют контролировать доступ к переменным и ресурсам, защищать критические секции программы и избегать тупиковых ситуаций. Мы рассмотрим их в этой главе и проиллюстрируем на примерах.

    13.1. Создание потоков и манипулирование ими

    К числу основных операций над потоками относятся создание потока, передача ему входной информации и получение результатов, останов потока и т.д. Можно получить список запущенных потоков, опросить состояние потока и выполнить ряд других проверок.

    Ниже представлен обзор основных операций.

    13.1.1. Создание потоков

    Создать поток просто: достаточно вызвать метод new и присоединить блок, который будет исполняться в потоке.

    thread = Thread.new do

     # Предложения, исполняемые в потоке...

    end

    Возвращаемое значение — объект типа

    Thread
    . Главный поток программы может использовать его для управления вновь созданным потоком.

    А если нужно передать потоку параметры? Достаточно передать их методу

    Thread.new
    , который, в свою очередь, передаст их блоку.

    a = 4

    b = 5

    с = 6

    thread2 = Thread.new(а,b,с) do |a, x, y|

     # Манипуляции с a, x и y.

    end


    # Если переменная а будет изменена новым потоком,

    # то главный поток не получит об этом никакого уведомления.

    Параметры блока, являющиеся ссылками на существующие переменные, практически неотличимы от самих переменных. Поэтому, например, переменная

    а
    в каком-то смысле «опасна», что и отражено в комментарии.

    Поток может также обращаться к переменным из той же области видимости, в которой был создан. Ясно, что без синхронизации это может стать источником проблем. Главный и любой другой поток могут изменять такую переменную независимо друг от друга, и результаты подобных действий непредсказуемы.

    x = 1

    y = 2

    thread3 = Thread.new do

     # Этот поток может манипулировать переменными x and y

     # из внешней области видимости, но это не всегда безопасно.

     sleep(rand(0)) # Спать в течение случайно выбранного времени

     # (меньше секунды).

     x = 3

    end

    sleep(rand(0))

    puts x

    # Если запустить эту программу несколько раз подряд, то может быть

    # напечатано как 1, так и 3!

    У метода

    new
    есть синоним
    fork
    — это имя выбрано по аналогии с хорошо известным системным вызовом в UNIX.

    13.1.2. Доступ к локальным переменным потока

    Мы знаем об опасности доступа из потока к переменным, определенным вне его области видимости, но мы также знаем, что у потока могут быть локальные данные. А что делать, если поток хочет «обнародовать» часть принадлежащих ему данных?

    Для этой цели предусмотрен специальный механизм. Если объект

    Thread
    рассматривать как хэш, то к локальным данным потока можно обратиться из любого места в области видимости этого объекта. Мы не хотим сказать, что так можно обратиться к настоящим локальным переменным; это допустимо лишь для доступа к именованным данным, своим для каждого потока.

    Существует также метод

    key?
    , который сообщает, используется ли указанное имя в данном потоке.

    Внутри потока к таким данным тоже следует обращаться, как к хэшу. Метод

    Thread.current
    позволяет сделать запись чуть менее громоздкой.

    thread = Thread.new do

    t = Thread.current

    t[:var1] = "Это строка"

    t[:var2] = 365

    end


    # Доступ к локальным данным потока извне...


    x = thread[:var1]              # "Это строка"

    y = thread[:var2]              # 365


    has_var2 = thread.key?("var2") # true

    has_var3 = thread.key?("var3") # false

    Отметим, что эти данные доступны другим потокам даже после того, их владелец завершил работу (как в данном случае).

    Помимо символа (см. выше), для идентификации локальной переменной потока можно употреблять и строки.

    thread = Thread.new do

    t = Thread.current

    t["var3"] = 25

    t[:var4] = "foobar"

    end


    a = thread[:var3] = 25

    b = thread["var4"] = "foobar"

    He путайте эти специальные имена с настоящими локальными переменными. В следующем фрагменте разница видна более отчетливо:

    thread = Thread.new do

     t = Thread.current

     t["var3"] = 25

     t[:var4] = "foobar"

     var3 = 99         # Настоящие локальные переменные

     var4 = "zorch"    # (извне недоступны)

    end


    a = thread[:var3]  # 25

    b = thread["var4"] # "foobar"

    И еще отметим, что ссылку на объект (на настоящую локальную переменную) внутри потока можно использовать для сокращенной записи. Это справедливо, если вы сохраняете одну и ту же ссылку, а не создаете новую.

    thread = Thread.new do

     t = Thread.current

     x = "nXxeQPdMdxiBAxh"

     t[:my_message] = x

     x.reverse!

     x.delete! "x"

     x.gsub!(/[A-Z]/,"")

     # С другой стороны, присваивание создает новый объект,

     # поэтому "сокращение" становится бесполезным...

    end

    а = thread[:my_message] # "hidden"

    Ясно, что сокращение не будет работать и в том случае, когда вы имеете дело с объектами наподобие

    Fixnum
    , которые хранятся как непосредственные значения, а не ссылки.

    13.1.3. Опрос и изменение состояния потока

    В классе

    Thread
    есть несколько полезных методов класса. Метод
    list
    возвращает массив «живых» потоков, метод
    main
    возвращает ссылку на главный поток программы, который породил все остальные, а метод
    current
    позволяет потоку идентифицировать самого себя.

    t1 = Thread.new { sleep 100 }

    t2 = Thread.new do

     if Thread.current == Thread.main

     puts "Это главный поток." # HE печатается,

    end

    1.upto(1000) { sleep 0.1 }

    end


    count = Thread.list.size   # 3

    if Thread.list.include ?(Thread.main)

     puts "Главный поток жив." # Печатается всегда!

    end

    if Thread.current == Thread.main

     puts "Я главный поток."   # Здесь печатается...

    end

    Методы

    exit
    ,
    pass
    ,
    start
    ,
    stop
    и
    kill
    служат для управления выполнением потоков (как изнутри, так и извне):

    # в главном потоке...

    Thread.kill(t1)        # Завершить этот поток.

    Thread.pass            # Передать управление t2.

    t3 = Thread.new do

     sleep 20

     Thread.exit           # Выйти из потока.

     puts "Так не бывает!" # Никогда не выполняется.

    end

    Thread.kill(t2)        # Завершить t2.

    # Выйти из главного потока (все остальные тоже завершаются).

    Thread.exit

    Отметим, что не существует метода экземпляра

    stop
    , поэтому поток может приостановить собственное выполнение, но не выполнение другого потока.

    Существуют различные методы для опроса состояния потока. Метод экземпляра

    alive?
    сообщает, является ли данный поток «живым» (не завершил выполнение), а метод
    stop?
    — находится ли он в состоянии «приостановлен».

    count = 0

    t1 = Thread.new { loop { count += 1 } }

    t2 = Thread.new { Thread.stop }

    sleep 1

    flags = [t1.alive?, # true

             t1.stop?,  # false

             t2.alive?, # true

             t2.stop?]  # true

    Получить состояние потока позволяет метод

    status
    . Он возвращает значение
    "run"
    , если поток выполняется;
    "sleep"
    — если он приостановлен, спит или ожидает результата ввода/вывода;
    false
    — если поток нормально завершился, и
    nil
    — если поток завершился в результате исключения.

    t1 = Thread.new { loop {} }

    t2 = Thread.new { sleep 5 }

    t3 = Thread.new { Thread.stop }

    t4 = Thread.new { Thread.exit }

    t5 = Thread.new { raise "exception" }

    s1 = t1.status # "run"

    s2 = t2.status # "sleep"

    s3 = t3.status # "sleep"

    s4 = t4.status # false

    s5 = t5.status # nil

    Глобальную переменную

    $SAFE
    можно установить по-разному в разных потоках. Стало быть, она вовсе не является глобальной, но стоит ли жаловаться на это, если она позволяет разным потокам работать с разным уровнем безопасности? Метод
    safe_level
    возвращает текущий уровень безопасности потока.

    t1 = Thread.new { $SAFE = 1; sleep 5 }

    t2 = Thread.new { $SAFE = 3; sleep 5 }

    sleep 1

    lev0 = Thread.main.safe_level # 0

    lev1 = t1.safe_level          # 1

    lev2 = t2.safe_level          # 3

    Метод доступа

    priority
    позволяет узнать и изменить приоритет потока:

    t1 = Thread.new { loop { sleep 1 } }

    t2 = Thread.new { loop { sleep 1 } }

    t2.priority = 3  # Установить для потока t2 приоритет 3

    p1 = t1.priority # 0

    p2 = t2.priority # 3

    Поток с большим приоритетом будет чаще получать процессорное время. Специальный метод

    pass
    позволяет передать управление планировщику. Иными словами, поток просто уступает свой временной квант, но не приостанавливается и не засыпает.

    t1 = Thread.new do

     puts "alpha"

     Thread.pass

     puts "beta"

    end

    t2 = Thread.new do

     puts "gamma"

     puts "delta"

    end


    t1.join

    t2.join

    В этом искусственном примере вызов

    Thread.pass
    приводит к печати строк в следующем порядке:
    alpha gamma delta beta
    . Без него было бы напечатано
    alpha beta gamma delta
    . Конечно, этот механизм следует использовать не для синхронизации, а только для экономного расходования процессорного времени.

    Выполнение приостановленного потока можно возобновить методами методами

    run
    или
    wakeup
    :

    t1 = Thread.new do

     Thread.stop

     puts "Здесь есть изумруд."

    end

    t2 = Thread.new do

     Thread.stop

     puts "Вы находитесь в точке Y2."

    end

    sleep 1

    t1.wakeup

    t2.run

    Между этими методами есть тонкое различие. Метод

    wakeup
    изменяет состояние потока, так что он становится готовым к выполнению, но не запускает его немедленно. Метод же
    run
    пробуждает поток и сразу же планирует его выполнение.

    В данном случае

    t1
    просыпается раньше
    t2
    , но
    t2
    планируется первым, что приводит к следующему результату:

    Вы находитесь в точке Y2.

    Здесь есть изумруд.

    Конечно, было бы неосмотрительно реализовывать синхронизацию на основе этого механизма.

    Метод экземпляра

    raise
    возбуждает исключение в потоке, от имени которого вызван. (Этот метод необязательно вызывать в том потоке, которому адресовано исключение.)

    factorial1000 = Thread.new do

     begin

      prod = 1

      1.upto(1000) {|n| prod *= n }

      puts "1000! = #{prod}"

     rescue

      # Ничего не делать...

     end

    end


    sleep 0.01 # На вашей машине значение может быть иным.

    if factorial1000.alive?

     factorial1000.raise("Стоп!")

     puts "Вычисление было прервано!"

    else

     puts "Вычисление успешно завершено."

    end

    Поток, запущенный в предыдущем примере, пытался вычислить факториал 1000. Если для этого не хватило одной сотой секунды, то главный поток завершит его. Как следствие, на относительно медленной машине будет напечатано сообщение «Вычисление было прервано!» Что касается части

    rescue
    внутри потока, то в ней мог бы находиться любой код, как, впрочем, и всегда.

    13.1.4. Назначение рандеву (и получение возвращенного значения)

    Иногда главный поток хочет дождаться завершения другого потока. Для этой цели предназначен метод

    join
    :

    t1 = Thread.new { do_something_long() }


    do_something_brief()

    t1.join # Ждать завершения t1.

    Отметим, что вызывать метод

    join
    необходимо, если нужно дождаться завершения другого потока. В противном случае главный поток завершится, а вместе с ним и все остальные. Например, следующий код никогда не напечатал бы окончательный ответ, не будь в конце вызова
    join
    :

    meaning_of_life = Thread.new do

     puts "Смысл жизни заключается в..."

     sleep 10

     puts 42

    end


    sleep 9

    meaning_of_life.join

    Существует полезная идиома, позволяющая вызвать метод

    join
    для всех «живых» потоков, кроме главного (ни один поток, даже главный, не может вызывать
    join
    для самого себя).

    Thread.list.each { |t| t.join if t != Thread.main }

    Конечно, любой поток, а не только главный, может вызвать

    join
    для любого другого потока. Если главный поток и какой-то другой попытаются вызвать
    join
    друг для друга, возникнет тупиковая ситуация. Интерпретатор обнаружит это и завершит программу.

    thr = Thread.new { sleep 1; Thread.main.join }


    thr.join # Тупиковая ситуация!

    С потоком связан блок, который может возвращать значение. Следовательно, и сам поток может возвращать значение. Метод

    value
    неявно вызывает
    join
    и ждет, пока указанный поток завершится, а потом возвращает значение последнего вычисленного в потоке выражения.

    max = 10000

    thr = Thread.new do

     sum = 0

     1.upto(max) { |i| sum += i }

     sum

    end


    guess = (max*(max+1))/2

    print "Формула "

    if guess == thr.value

     puts "правильна."

    else

     puts "неправильна."

    end

    13.1.5. Обработка исключений

    Что произойдет, если в потоке возникнет исключение? Как выясняется, поведение можно сконфигурировать заранее.

    Существует флаг

    abort_on_exception
    , который работает как на уровне класса, так и на уровне экземпляра. Он реализован в виде метода доступа (то есть позволяет читать и устанавливать атрибут) на обоих уровнях. Если
    abort_on_exception
    для некоторого потока равен
    true
    , то при возникновении в этом потоке исключения будут завершены и все остальные потоки.

    Thread.abort_on_exception = true


    t1 = Thread.new do

     puts "Привет!"

     sleep 2

     raise "some exception"

     puts "Пока!"

    end


    t2 = Thread.new { sleep 100 }


    sleep 2

    puts "Конец"

    В этом примере флаг

    abort_on_exception
    установлен в
    true
    на уровне системы в целом (отменяя подразумеваемое по умолчанию значение). Следовательно, когда в потоке
    t1
    возникает исключение, завершаются и
    t1
    , и главный поток. Печатается только слово «Привет!».

    В следующем примере эффект такой же:

    t1 = Thread.new do

     puts "Привет!"

     sleep 2

     raise "some exception"

     puts "Пока!"

    end


    t1.abort_on_exception = true


    t2 = Thread.new { sleep 100 }


    sleep 2

    puts "Конец"

    А вот в следующем оставлено принимаемое по умолчанию значение

    false
    , и мы наконец-то видим слово «Конец», печатаемое главным потоком (слова «Пока!» мы не увидим никогда, поскольку поток
    t1
    при возникновении исключения завершается безусловно).

    t1 = Thread.new do

     puts "Привет!"

     sleep 2

     raise "some exception"

     puts "Пока!"

    end


    t2 = Thread.new { sleep 100 }


    sleep 2

    puts "Конец"


    # Выводится:


    Привет!

    Конец

    13.1.6. Группы потоков

    Группа потоков — это механизм управления логически связанными потоками. По умолчанию все потоки принадлежат группе

    Default
    (это константа класса). Но если создать новую группу, то в нее можно будет помещать потоки.

    В любой момент времени поток может принадлежать только одной группе. Если поток помещается в группу, то он автоматически удаляется из той группы, которой принадлежал ранее.

    Метод класса

    ThreadGroup.new
    создает новую группу потоков, а метод экземпляра
    add
    помещает поток в группу.

    f1 = Thread.new("file1") { |file| waitfor(file) }

    f2 = Thread.new("file2") { |file| waitfor(file) }


    file_threads = ThreadGroup.new

    file_threads.add f1

    file_threads.add f2

    Метод экземпляра

    list
    возвращает массив всех потоков, принадлежащих данной группе.

    # Подсчитать все "живые" потоки в группе this_group.

    count = 0

    this_group.list.each {|x| count += 1 if x.alive? }

    if count < this_group.list.size

     puts "Некоторые потоки в группе this_group уже скончались."

    else

     puts "Все потоки в группе this_group живы."

    end

    В класс

    ThreadGroup
    можно добавить немало полезных методов. В примере ниже показаны методы для возобновления всех потоков, принадлежащих группе, для группового ожидания потоков (с помощью
    join
    ) и для группового завершения потоков:

    class ThreadGroup


    def wakeup

     list.each { |t| t.wakeup }

    end


    def join

     list.each { |t| t.join if t != Thread.current }

    end


    def kill

     list.each { |t| t.kill }

    end


    end

    13.2. Синхронизация потоков

    Почему необходима синхронизация? Потому что из-за «чередования» операций доступ к переменным и другим сущностям может осуществляться в порядке, который не удается установить путем чтения исходного текста отдельных потоков. Два и более потоков, обращающихся к одной и той же переменной, могут взаимодействовать между собой непредвиденными способами, и отлаживать такую программу очень трудно.

    Рассмотрим простой пример:

    x = 0

    t1 = Thread.new do

     1.upto(1000) do

      x = x + 1

     end

    end


    t2 = Thread.new do

     1.upto(1000) do

      x = x + 1

     end

    end


    t1.join

    t2.join

    puts x

    Сначала переменная

    x
    равна 0. Каждый поток увеличивает ее значение на тысячу раз. Логика подсказывает, что в конце должно быть напечатано 2000.

    Но фактический результат противоречит логике. На конкретной машине было напечатано значение 1044. В чем дело?

    Мы предполагали, что инкремент целого числа — атомарная (неделимая) операция. Но это не так. Рассмотрим последовательность выполнения приведенной выше программы. Поместим поток

    t1
    слева, а поток
    t2
    справа. Каждый квант времени занимает одну строчку и предполагается, что к моменту, когда был сделан этот мгновенный снимок, переменная
    x
    имела значение 123.

    t1                            t2

    --------------------------    -----------------------------

    Прочитать значение x (123)

                                  Прочитать значение x (123)

    Увеличить значение на 1 (124)

                                  Увеличить значение на 1 (124)

    Записать результат в x

                                  Записать результат в x

    Ясно, что каждый поток увеличивает на 1 то значение, которое видит. Но не менее ясно и то, что после увеличения на 1 обоими потоками

    x
    оказалось равно всего 124.

    И это лишь самая простая из проблем, возникающих в связи с синхронизацией. Для решения более сложных приходится прилагать серьезные усилия — это предмет изучения специалистами в области теоретической информатики и математики.

    13.2.1. Синхронизация с помощью критических секций

    Простейший способ синхронизации дают критические секции. Когда поток входит в критическую секцию программы, гарантируется, что никакой другой поток не войдет в нее, пока первый не выйдет.

    Если акцессору

    Thread.critical
    присвоить значение
    true
    , то выполнение других потоков не будет планироваться. В следующем примере мы переработали код предыдущего, воспользовавшись акцессором
    critical
    для определения критической области, которая защищает уязвимые участки программы.

    x = 0

    t1 = Thread.new do

     1.upto(1000) do

      Thread.critical = true

      x = x + 1

      Thread.critical = false

     end

    end


    t2 = Thread.new do

     1.upto(1000) do

      Thread.critical = true

      x = x + 1

      Thread.critical = false

     end

    end


    t1.join

    t2.join

    puts x

    Теперь последовательность выполнения изменилась; взгляните, в каком порядке работают потоки

    t1
    и
    t2
    . (Конечно, вне того участка, где происходит увеличение переменной, потоки могут чередоваться более-менее случайным образом.)

    t1                            t2

    ----------------------------- -----------------------------

    Прочитать значение x (123)

    Увеличить значение на 1 (124)

    Записать результат в x

                                  Прочитать значение x (124)

                                  Увеличить значение на 1 (125)

                                  Записать результат в x

    Возможны такие комбинации операций с потоками, при которых поток планируется даже тогда, когда какой-то другой поток находится в критической секции.

    Простейший случай — вновь созданный поток начинает исполнение немедленно вне зависимости от того, занимает какой-то другой поток критическую секцию или нет. Поэтому описанную технику лучше применять только в самых простых ситуациях.

    13.2.2. Синхронизация доступа к ресурсам (mutex.rb)

    В качестве примера рассмотрим задачу индексирования Web-сайтов. Мы извлекаем слова из многочисленных страниц в Сети и сохраняем их в хэше. Ключом является само слово, а значением — строка, идентифицирующая документ и номер строки в этом документе.

    Постановка задачи и так достаточно груба. Но мы огрубим ее еще больше, введя следующие упрощающие допущения:

    • будем представлять удаленные документы в виде строк;

    • ограничимся всего тремя строками (они будут «зашиты» в код);

    • сетевые задержки будем моделировать «засыпанием» на случайный промежуток времени.

    Взгляните на программу в листинге 13.1. Она даже не печатает получаемые данные целиком, а выводит лишь счетчик слов (не уникальный). Каждый раз при чтении или обновлении хэша мы вызываем метод

    hesitate
    , который приостанавливает поток на случайное время. Тем самым поведение программы становится недетерминированным и приближенным к реальности.

    Листинг 13.1. Программа индексирования с ошибками (гонка)

    @list = []

    @list[0]="shoes ships\nsealing-wax"

    @list[1]="cabbages kings"

    @list[2]="quarks\nships\ncabbages"


    def hesitate

     sleep rand(0)

    end


    @hash = {}


    def process_list(listnum)

     lnum = 0

     @list[listnum].each do |line|

      words = line.chomp.split

      words.each do |w|

       hesitate

       if @hash[w]

        hesitate

        @hash[w] += ["#{listnum}:#{lnum}"]

       else

        hesitate

        @hash[w] = ["#{listnum}:#{lnum}"]

       end

      end

      lnum += 1

     end

    end


    t1 = Thread.new(0) {|num| process_list(num) }

    t2 = Thread.new(1) {|num| process_list(num) }

    t3 = Thread.new(2) {|num| process_list(num) }


    t1.join

    t2.join

    t3.join


    count = 0

    @hash.values.each {|v| count += v.size }


    puts "Всего слов: #{count} " # Может быть напечатано 7 или 8!

    Здесь имеется проблема. Если ваша система ведет себя примерно так же, как наша, то программа может напечатать одно из двух значений! В наших тестах с одинаковой вероятностью печаталось 7 или 8. Если слов и списков больше, то и разброс окажется более широким.

    Попробуем исправить положение с помощью мьютекса, который будет контролировать доступ к разделяемому ресурсу. (Слово «mutex» — это сокращение от mutual exclusion, «взаимная блокировка».)

    Обратимся к листингу 13.2. Библиотека

    Mutex
    позволяет создавать мьютексы и манипулировать ими. Мы можем захватить (lock) мьютекс перед доступом к хэшу и освободить (unlock) его по завершении операции.

    Листинг 13.2. Программа индексирования с мьютексом

    require 'thread.rb'


    @list = []

    @list[0]="shoes ships\nsealing-wax"

    @list[1]="cabbages kings"

    @list[2]="quarks\nships\ncabbages"


    def hesitate

     sleep rand(0)

    end


    @hash = {}


    @mutex = Mutex.new


    def process_list(listnum)

     lnum = 0

     @list[listnum].each do |line|

      words = line.chomp.split

      words.each do |w|

       hesitate

       @mutex.lock

       if @hash[w]

        hesitate

        @hash[w] += ["#{listnum}:#{lnum}"]

       else

        hesitate

        @hash[w] = ["#{listnum}:#{lnum}"]

       end

       @mutex.unlock

      end

      lnum += 1

     end

    end


    t1 = Thread.new(0) {|num| process_list(num) }

    t2 = Thread.new(1) {|num| process_list(num) }

    t3 = Thread.new(2) {|num| process_list(num) }


    t1.join

    t2.join

    t3.join


    count = 0

    @hash.values.each {|v| count += v.size }


    puts "Всего слов: #{count} " # Всегда печатается 8!

    Отметим, что помимо метода

    lock
    в классе
    Mutex
    есть также метод
    try_lock
    . Он отличается от
    lock
    тем, что если мьютекс уже захвачен другим потоком, то он не дожидается освобождения, а сразу возвращает
    false
    .

    require 'thread'


    mutex = Mutex.new

    t1 = Thread.new { mutex.lock; sleep 30 }


    sleep 1


    t2 = Thread.new do

     if mutex.try_lock

      puts "Захватил"

     else

      puts "He сумел захватить" # Печатается немедленно.

     end

    end


    sleep 2

    Эта возможность полезна, если поток не хочет приостанавливать выполнение. Есть также метод

    synchronize
    , который захватывает мьютекс, а потом автоматически освобождает его.

    mutex = Mutex.new


    mutex.synchronize do

     # Любой код, нуждающийся в защите...

    end

    Существует еще библиотека

    mutex_m
    , где определен модуль
    Mutex_m
    , который можно подмешивать к классу (или использовать для расширения объекта). У такого расширенного объекта будут все методы мьютекса, так что он сам может выступать в роли мьютекса.

    require 'mutex_m'


    class MyClass

     include Mutex_m


     # Теперь любой объект класса MyClass может вызывать

     # методы lock, unlock, synchronize...

     # Внешние объекты также могут вызывать эти

     # методы для объекта MyClass.

    end

    13.2.3. Предопределенные классы синхронизированных очередей

    В библиотеке

    thread.rb
    есть пара классов, которые иногда бывают полезны. Класс
    Queue
    реализует безопасную относительно потоков очередь, доступ к обоим концам которой синхронизирован. Это означает, что разные потоки могут, ничего не опасаясь, работать с такой очередью. Класс
    SizedQueue
    отличается от предыдущего тем, что позволяет ограничить размер очереди (число элементов в ней).

    Оба класса имеют практически один и тот же набор методов, поскольку

    SizedQueue
    наследует
    Queue
    . Правда, в подклассе определен еще акцессор max, позволяющий получить и установить максимальный размер очереди.

    buff = SizedQueue.new(25)

    upper1 = buff.max #25

    # Увеличить размер очереди...

    buff.max = 50

    upper2 = buff.max # 50

    В листинге 13.3 приведено решение задачи о производителе и потребителе. Для производителя задержка (аргумент sleep) чуть больше, чем для потребителя, чтобы единицы продукции «накапливались».

    Листинг 13.3. Задача о производителе и потребителе

    require 'thread'


    buffer = SizedQueue.new(2)


    producer = Thread.new do

     item = 0

     loop do

      sleep rand 0

      puts "Производитель произвел #{item}"

      buffer.enq item

      item += 1

     end

    end


    consumer = Thread.new do

     loop do

      sleep (rand 0)+0.9

      item = buffer.deq

      puts "Потребитель потребил #{item}"

      puts " ожидает = #{buffer.num_waiting}"

     end

    end


    sleep 60 # Работать одну минуту, потом завершить оба потока.

    Чтобы поместить элемент в очередь и извлечь из нее, рекомендуется применять соответственно методы

    enq
    и
    deq
    . Можно было бы для помещения в очередь пользоваться также методом
    push
    , а для извлечения — методами
    pop
    и
    shift
    , но их названия не так мнемоничны в применении к очередям.

    Метод

    empty?
    проверяет, пуста ли очередь, а метод
    clear
    опустошает ее. Метод
    size
    (и его синоним
    length
    ) возвращает число элементов в очереди.

    # Предполагается, что другие потоки не мешают...


    buff = Queue.new

    buff.enq "one"

    buff.enq "two"

    buff.enq "three"

    n1 = buff.size      # 3

    flag1 = buff.empty? # false

    buff.clear

    n2 = buff.size      # 0

    flag2 = buff.empty? # true

    Метод

    num_waiting
    возвращает число потоков, ожидающих доступа к очереди. Если размер очереди не ограничен, то это потоки, ожидающие возможности удалить элементы; для ограниченной очереди включаются также потоки, пытающиеся добавить элементы.

    Необязательный параметр

    non_block
    метода
    deq
    в классе
    Queue
    по умолчанию равен
    false
    . Если же он равен
    true
    , по при попытке извлечь элемент из пустой очереди он не блокирует поток, а возбуждает исключение
    ThreadError
    .

    13.2.4. Условные переменные

    Да зовите моих скрипачей, трубачей...

    («Веселый король» (детский стишок)[16])

    Условная переменная — это, по существу, очередь потоков. Они используются в сочетании с мьютексами для лучшего управления синхронизацией потоков.

    Условная переменная всегда ассоциируется с каким-то мьютексом. Ее назначение — освободить мьютекс до тех пор, пока не начнет выполняться определенное условие. Представьте себе ситуацию, когда поток захватил мьютекс, но не готов продолжать выполнение. Тогда он может заснуть под контролем условной переменной, ожидая, что будет разбужен, когда условие станет истинным.

    Важно понимать, что пока поток ждет условную переменную, мьютекс свободен, поэтому другие потоки могут получить доступ к защищенному им ресурсу. А как только другой поток сигнализирует этой переменной, ожидающий поток пробуждается и пытается вновь захватить мьютекс.

    Рассмотрим несколько искусственный пример в духе задачи об обедающих философах. Представьте себе, что вокруг стола сидят три скрипача, ожидающих своей очереди поиграть. Но у них есть всего две скрипки и один смычок. Понятно, что скрипач сможет играть, только если одновременно завладеет одной из скрипок и смычком.

    Мы поддерживаем счетчики свободных скрипок и смычков. Когда скрипач хочет получить скрипку и смычок, он должен ждать их освобождения. В программе ниже мы защитили проверку условия мьютексом и под его защитой ждем скрипку и смычок порознь. Если скрипка или смычок заняты, поток засыпает. Он не владеет мьютексом до тех пор, пока другой поток не просигнализирует о том, что ресурс свободен. В этот момент первый поток просыпается и снова захватывает мьютекс.

    Код представлен в листинге 13.4.

    Листинг 13.4. Три скрипача

    require 'thread'


    @music = Mutex.new

    @violin = ConditionVariable.new

    @bow = ConditionVariable.new


    @violins_free = 2

    @bows_free = 1


    def musician(n)

     loop do

      sleep rand(0)

      @music.synchronize do

       @violin.wait(@music) while @violins_frее == 0

       @violins_free -= 1

       puts "#{n} владеет скрипкой"

       puts "скрипок #@violins_frее, смычков #@bows_free"


       @bow.wait(@music) while @bows_free == 0

       @bows_free -= 1

       puts "#{n} владеет смычком"

       puts "скрипок #@violins_free, смычков #@bows_free"

      end


      sleep rand(0)

      puts "#{n}: (...играет...)"

      sleep rand(0)

      puts "#{n}: Я закончил."


      @music.synchronize do

       @violins_free += 1

       @violin.signal if @violins_free == 1

       @bows_free += 1

       @bow.signal if @bows_free == 1

      end

     end

    end


    threads = []

    3.times {|i| threads << Thread.new { musician(i) } }


    threads.each {|t| t.join }

    Мы полагаем, что это решение никогда не приводит к тупиковой ситуации, хотя доказать этого не сумели. Но интересно отметить, что описанный алгоритм не справедливый. В наших тестах оказалось, что первый скрипач играет чаще двух остальных, а второй чаще третьего. Выяснение причин такого поведения и его исправление мы оставляем читателю в качестве интересного упражнения.

    13.2.5. Другие способы синхронизации

    Еще один механизм синхронизации - это монитор, который в Ruby реализован в библиотеке

    monitor.rb
    . Это более развитый по сравнению с мьютексом механизм, основное отличие состоит в том, что захваты одного и того же мьютекса не могут быть вложенными, а монитора — могут.

    Тривиальный случай возникновения такой ситуации вряд ли возможен. В самом деле, кто станет писать такой код:

    @mutex = Mutex.new


    @mutex.synchronize do

     @mutex.synchronize do

      #...

     end

    end

    Но нечто подобное может произойти в сложной программе (или при рекурсивном вызове метода). Какова бы ни была причина, последствием будет тупиковая ситуация. Уход от нее — одно из достоинств модуля-примеси

    Monitor
    .

    @mutex = Mutex.new


    def some_method

     @mutex.synchronize do

      #...

      some_other_method # Тупиковая ситуация!

     end

    end


    def some_other_method

     @mutex.synchronize do

      #...

     end

    end

    Модуль-примесь

    Monitor
    обычно применяется для расширения объекта. Для создания условной переменной предназначен метод
    new_cond
    .

    Класс

    ConditionVariable
    в библиотеке
    monitor.rb
    дополнен по сравнению с определением в библиотеке
    thread
    . У него есть методы
    wait_until
    и
    wait_while
    , которые блокируют поток в ожидании выполнения условия. Кроме того, возможен тайм-аут при ожидании, поскольку у метода
    wait
    имеется параметр
    timeout
    , равный количеству секунд (по умолчанию
    nil
    ).

    Поскольку примеры работы с потоками у нас кончаются, то в листинге 13.5 мы предлагаем реализацию классов

    Queue
    и
    SizedQueue
    с помощью монитора. Код приводится с разрешения автора, Шуго Маэда (Shugo Maeda).

    Листинг 13.5. Реализация класса Queue с помощью монитора

    # Автор: Shugo Maeda


    require 'monitor'


    class Queue

     def initialize

      @que = []

      @monitor = Monitor.new

      @empty_cond = @monitor.new_cond

     end


     def enq(obj)

      @monitor.synchronize do

       @que.push(obj)

       @empty_cond.signal

      end

     end


     def deq

      @monitor.synchronize do

       while @que.empty?

        @empty_cond.wait

       end

       return @que.shift

      end

     end

    end


    class SizedQueue < Queue

     attr :max


     def initialize(max)

      super()

      @max = max

      @full_cond = @monitor.new_cond

     end


     def enq(obj)

      @monitor.synchronize do

       while @que.length >= @max

        @full_cond.wait

       end

       super(obj)

      end

     end


     def deq

      @monitor.synchronize do

       obj = super

       if @que.length < @max

        @full_cond.signal

       end

       return obj

      end

     end


     def max=(max)

      @monitor.synchronize do

       @max = max

       @full_cond.broadcast

      end

     end

    end

    Еще один вариант синхронизации (двузначную блокировку со счетчиком) предлагает библиотека

    sync.rb
    . В ней определен модуль
    Sync_m
    , который можно применять вместе с ключевыми словами
    include
    и
    extend
    (как и
    Mutex_m
    ). Этот модуль содержит методы
    locked?
    ,
    shared?
    ,
    exclusive?
    ,
    lock
    ,
    unlock
    и
    try_lock
    .

    13.2.6. Тайм-аут при выполнении операций

    Часто встречается ситуация, когда на выполнение операции отводится определенное максимальное время. Это позволяет избежать бесконечных циклов и более строго контролировать порядок работы. Подобная возможность очень полезна, в частности, в сетевых приложениях, где ответ от сервера может и не прийти.

    Библиотека

    timeout.rb
    предлагает решение этой проблемы на основе потоков (см. листинг 13.6). С методом
    timeout
    ассоциирован выполняемый блок. Если истечет заданное число секунд, метод возбуждает исключение
    TimeoutError
    , которое можно перехватить с помощью
    rescue
    .

    Листинг 13.6. Пример тайм-аута

    require 'timeout.rb'


    flag = false

    answer = nil


    begin

     timeout(5) do

      puts "Хочу печенье!"

      answer = gets.chomp

      flag = true

     end

     rescue TimeoutError

     flag = false

    end


    if flag

     if answer == "cookie"

      puts "Спасибо! Хрум, хрум..."

     else

      puts "Это же не печенье!"

      exit

     end

    else

     puts "Эй, слишком медленно!"

     exit

    end


    puts "До встречи..."

    13.2.7. Ожидание события

    Часто один или несколько потоков следят за «внешним миром», а остальные выполняют полезную работу. Все примеры в этом разделе надуманные, но общий принцип они все же иллюстрируют.

    В следующем примере прикладную задачу решают три потока. Четвертый поток каждые пять секунд просыпается, проверяет глобальную переменную

    $flag
    и, когда видит, что флаг поднят, пробуждает еще два потока. Это освобождает три рабочих потока от необходимости напрямую общаться с двумя другими и, возможно, от многочисленных попыток разбудить их.

    $flag = false

    work1 = Thread.new { job1() }

    work2 = Thread.new { job2() }

    work3 = Thread.new { job3() }


    thread4 = Thread.new { Thread.stop; job4() }

    thread5 = Thread.new { Thread.stop; job5() }


    watcher = Thread.new do

     loop do

      sleep 5

      if $flag

       thread4.wakeup

       thread5.wakeup

       Thread.exit

      end

     end

    end

    Если в какой-то момент выполнения метода

    job
    , переменная
    $flag
    станет равной
    true
    , то в течение пяти секунд после этого потоки
    thread4
    и
    thread5
    гарантированно запустятся. После этого поток
    watcher
    завершается.

    В следующем примере мы ждем создания файла. Каждые 30 секунд проверяется его существование, и как только файл появится, мы запускаем новый поток. Тем временем остальные потоки занимаются своим делом. На самом деле ниже мы наблюдаем за тремя разными файлами.

    def waitfor(filename)

     loop do

      if File.exist? filename

       file_processor = Thread.new { process_file(filename) }

       Thread.exit

      else

       sleep 30

      end

     end

    end


    waiter1 = Thread.new { waitfor("Godot") }

    sleep 10

    waiter2 = Thread.new { waitfor("Guffman") }

    sleep 10

    headwaiter = Thread.new { waitfor("head") }


    # Основной поток занимается другими делами...

    Есть много ситуаций, когда поток должен ожидать внешнего события (например, в сетевых приложениях так бывает, когда сервер на другом конце соединения работает медленно или ненадежно).

    13.2.8. Продолжение обработки во время ввода/вывода

    Часто приложению приходится выполнять одну или более длительных операций ввода/вывода. Прежде всего, речь идет о вводе данных с клавиатуры, поскольку человек печатает куда медленнее, чем вращается диск. Это время можно употребить на пользу с помощью потоков.

    Возьмем, к примеру, шахматную программу, которая должна ждать, пока человек сделает ход. Конечно, мы можем изложить только сам принцип, не вдаваясь в технические детали.

    Предположим, что итератор

    predict_move
    генерирует вероятные ходы человека (и ответные ходы программы). Тогда в момент, когда человек сделает ход, не исключено, что у компьютера уже будет готов ответ.

    scenario = {} # Хэш ход-ответ.

    humans_turn = true

    thinking_ahead = Thread.new(board) do

     predict_move do |m|

      scenario[m] = my_response(board,m)

      Thread.exit if humans_turn == false

     end

    end


    human_move = get_human_move(board)

    humans_turn = false # Остановить поток.


    # Теперь можно посмотреть, нет ли в хэше scenario хода,

    # сделанного пользователем...

    Конечно, настоящие шахматные программы работают не так.

    13.2.9. Реализация параллельных итераторов

    Предположим, что нужно параллельно обходить несколько объектов, то есть для каждого объекта найти первый элемент, потом второй, потом третий и т.д.

    Рассмотрим следующий пример. Пусть

    compose
    — имя магического метода, который выполняет композицию итераторов. Допустим еще, что у каждого объекта есть стандартный итератор
    each
    и что каждый объект возвращает по одному элементу на каждой итерации.

    arr1 = [1, 2, 3, 4]

    arr2 = [5, 10, 15, 20]

    compose(arr1, arr2) {|a,b| puts "#{а} и #{b}" }


    # Должно быть напечатано:

    # 1 и 5

    # 2 и 10

    # 3 и 15

    # 4 и 20

    Можно было бы, конечно, использовать для этой цели

    zip
    . Но если нужно более элегантное решение, при котором все элементы не будут храниться одновременно, то без потоков не обойтись. Такое решение представлено в листинге 13.7.

    Листинг 13.7. Параллельные итераторы

    def compose(*objects)

     threads = []

     for obj in objects do

      threads << Thread.new(obj) do |myobj|

       me = Thread.current

       me[:queue] = []

       myobj.each {|x| me[:queue].push(x) }

      end

     end


     list = [0]                     # Фиктивное значение, отличное от nil.

     while list.nitems > 0 do       # Еще есть не nil.

      list = []

      for thr in threads

       list << thr[:queue].shift    # Удалить по одному из каждого.

      end

      yield list if list.nitems > 0 # He вызывать yield, если все равны nil.

     end

    end


    x = [1, 2, 3, 4, 5, 6, 7, 8]

    y = "    первый\n    второй\n    третий\n четвертый\n    пятый\n"

    z = %w[a b с d e f]


    compose(x, у, z) {|a,b,c| p [a, b, c] }


    # Выводится:

    # [1, "    первый\n", "a"]

    # [2, "    второй\n", "b"]

    # [3, "    третий\n", "c"]

    # [4, " четвертый\n", "d"]

    # [5, "     пятый\n", "e"]

    # [6, nil, "f"]

    # [7, nil, nil]

    # [8, nil, nil]

    Обратите внимание: мы не предполагаем, что все объекты имеют одно и то же число элементов. Если один итератор доходит до конца раньше остальных, то он будет генерировать значения

    nil
    до тех пор, пока не закончит работу «самый длинный» итератор.

    Конечно, можно написать и более общий метод, который на каждой итерации будет обрабатывать более одного элемента. (В конце концов, не все итераторы возвращают по одному значению за раз.) Можно было бы в первом параметре передавать число значений для каждого итератора.

    Можно также пользоваться произвольными итераторами (а не только стандартным

    each
    ). Их имена можно было бы передавать в виде строк, а вызывать с помощью метода
    send
    . Много чего еще можно придумать.

    Впрочем, мы полагаем, что приведенного кода достаточно для большинства целей. Вариации на эту тему оставляем читателю в качестве упражнения.

    13.2.10. Параллельное рекурсивное удаление

    Забавы ради напишем код, который будет удалять дерево каталогов. Процедура рекурсивного удаления использует потоки. Как только обнаруживается очередной подкаталог, мы запускаем новый поток, который будет обходить его и удалять содержимое.

    Созданные в ходе работы программы потоки хранятся в массиве

    threads
    . Поскольку это локальная переменная, у каждого потока будет собственная копия массива. Раз к ней может обращаться всего один поток, синхронизировать доступ не надо.

    Отметим также, что в блок потока передается полное имя файла

    fullname
    , чтобы не нужно было беспокоиться по поводу того, что поток обращается к переменной, которую кто-то еще изменяет. Поток делает для себя локальную копию
    fn
    этой переменной.

    Прежде чем удалять очередной каталог, мы должны дождаться завершения всех созданных в процессе его обхода потоков.

    def delete_all(dir)

     threads = []

     Dir.foreach(dir) do |e|

      next if [".",".."].include? e # Пропустить . и ..

      fullname = dir + "/" + e

      if FileTest.directory?(fullname)

       threads << Thread.new(fullname) {|fn| delete_all(fn) }

      else

       File.delete(fullname)

      end

     end

     threads.each { |t| t.join }

     Dir.delete(dir)

    end

    delete_all("/tmp/stuff")

    Будет ли работать такая программа быстрее, чем ее вариант без потоков? В наших тестах получалось по-разному. Возможно, это зависит от операционной системы и структуры конкретного каталога — глубины, количества файлов и т.д.

    13.3. Заключение

    Как было сказано, в Ruby не используются платформенные потоки. Программа не станет работать быстрее при наличии нескольких процессоров, но некоторого распараллеливания работы достичь все же можно. Потоки полезны во многих случаях» но писать и отлаживать многопоточную программу довольно трудно, особенно если для получения правильного результата приходится применять изощренные способы синхронизации.

    Для синхронизации Ruby предоставляет такие классы, как

    Mutex
    ,
    Monitor
    и
    ConditionVariable
    . Имеются также безопасные относительно потоков классы очередей
    Queue
    и
    SizedQueue
    .

    В главе 14 мы перейдем от обсуждения техники программирования к решению конкретных задач, а именно сценариев системного администрирования.


    Примечания:



    1

    Огромное спасибо (яп.)



    15

    Пер. М.Кузмина. — Прим.ред.



    16

    Пер. С. Маршака. — Прим. ред.









    Главная | В избранное | Наш E-MAIL | Добавить материал | Нашёл ошибку | Наверх