• 20.1. Обзор: библиотека drb
  • 20.2. Пример: эмуляция биржевой ленты
  • 20.3. Rinda: пространство кортежей в Ruby
  • 20.4. Обнаружение сервисов в распределенном Ruby
  • 20.5. Заключение
  • Глава 20. Распределенный Ruby

    Меньше — это больше.

    (Робер Браунинг, «Андреа дель Сарто»)

    В настоящее время имеется немало технологий, поддерживающих распределенную обработку: различные варианты RPC, а также COM, CORBA, DCE и Java RMI.

    Одни проще, другие сложнее, но в принципе все делают одно и то же - предоставляют относительно прозрачный способ связи между находящимися в сети объектами так, чтобы с удаленными объектами можно было работать, как с локальными.

    Зачем это вообще может понадобиться? Причин много. Например, чтобы распределить некоторую вычислительную задачу между многими процессорами. Примером может послужить программа SETI@home, которая использует ваш ПК для обработки небольших объемов данных в поисках внеземного разума (кстати, эта программа не является проектом института SETI). Другой пример — привлечение широких масс к взлому шифра RSA129 (эта попытка увенчалась успехом несколько лет назад). Существует очень много задач, которые можно разбить на небольшие части, пригодные для распределенного решения.

    Можно также представить себе, что вы хотите предоставить интерфейс к некоему сервису, не раскрывая исходных текстов. Часто это делается с помощью Web-приложений, но из-за отсутствия состояния в протоколе HTTP это не всегда удобно (есть и другие недостатки). Механизм распределенного программирования позволяет решать подобные задачи более естественно.

    В мире Ruby ответом на этот вызов стала программа drb, написанная Масатоси Секи (Masatoshi Seki); еще ее название записывают так: DRb. Существуют и другие способы распределенной обработки на Ruby, но drb, пожалуй, самый легкий. Здесь нет сложных служб разрешения имен, как в CORBA. Это всего лишь простая и удобная библиотека, предоставляющая всю необходимую функциональность. В данной главе мы рассмотрим основы работы как с ней самой, так и с надстроенной над ней системой Rinda.

    20.1. Обзор: библиотека drb

    Библиотека

    drb
    состоит из двух основных частей: серверной и клиентской. Грубую границу между ними можно провести следующим образом:

    Сервер:

    • запускает TCPServer и начинает прослушивать порт;

    • привязывает объект к экземпляру сервера

    drb
    ;

    • принимает запросы на соединение от клиентов и отвечает на их сообщения;

    • дополнительно может предоставлять контроль доступа (безопасность).

    Клиент:

    • устанавливает соединение с сервером;

    • привязывает локальный объект к удаленному экземпляру сервера;

    • посылает сообщения серверу и получает ответы.

    Метод класса

    start_service
    отвечает за запуск TCP-сервера, прослушивающего указанный порт. Он принимает два параметра: URI (универсальный идентификатор ресурса), задающий порт (если он равен
    nil
    , то порт выбирается динамически), и объект, к которому мы хотим привязаться. Этот объект будет доступен удаленному клиенту, который сможет вызывать его методы, как если бы объект был локальным.

    require "drb"


    myobj = MyServer.new


    DRb.start_service("druby://:1234", myobj) # Порт 1234.

    # ...

    Если порт выбирается динамически, то для получения полного URI, включающего и номер порта, можно воспользоваться методом класса

    uri
    .

    DRb.start_service(nil, myobj)

    myURI = DRb.uri # "druby://hal9000:2001"

    Поскольку

    drb
    —многопоточная программа, любое серверное приложение должно выполнять
    join
    в потоке сервера (чтобы не дать приложению завершиться преждевременно и тем самым уничтожить выполняющийся поток).

    # Предотвратить преждевременный выход.

    DRb.thread.join

    На стороне клиента мы вызываем метод

    start_service
    без параметров и с помощью класса
    DRbObject
    создаем локальный объект, соответствующий удаленному. Обычно первым параметром методу
    DRbObject.new
    передается
    nil
    .

    require "drb"


    DRb.start_service

    obj = DRbObject.new(nil, "druby://hal9000:2001")


    # Сообщения, передаваемые obj, перенаправляются

    # удаленному объекту на стороне сервера...

    Следует подчеркнуть, что на стороне сервера привязка осуществляется к единственному объекту, который должен отвечать на все получаемые запросы. Если клиентов несколько, то объект должен быть безопасным относительно потоков, чтобы не оказаться в некорректном состоянии. (Для совсем простых или узкоспециализированных приложений это может быть и необязательно.)

    Мы не можем вдаваться в технические детали. Но имейте в виду, что если клиент читает или изменяет внутреннее состояние удаленного объекта, то при наличии нескольких клиентов возможна интерференция. Во избежание таких неприятностей мы рекомендуем применять механизмы синхронизации, например класс

    Mutex
    . (Подробнее о потоках и синхронизации рассказывается в главе 13.)

    Скажем хотя бы несколько слов о безопасности. Ведь не всегда желательно, чтобы с вашим сервером мог соединяться кто угодно. Помешать им пытаться вы не можете, зато можете сделать такие попытки безуспешными.

    В программе

    drb
    есть понятие списка контроля доступа (ACL). Это не что иное, как списки клиентов (или категорий клиентов), которым явно разрешен (или запрещен) доступ.

    Приведем пример. Для создания нового списка ACL мы воспользуемся классом

    ACL
    , которому передадим один или два параметра.

    Второй (необязательный) параметр метода

    ACL.new
    служит для ответа на вопрос: «Мы запрещаем доступ всем клиентам, кроме некоторых, или, наоборот, разрешаем доступ всем клиентам, кроме некоторых?» По умолчанию принимается первый вариант, который обозначается константой
    DENY_ALLOW
    равной 0. Второй режим обозначается
    ALLOW_DENY
    равной 1.

    Первый параметр

    ACL.new
    представляет собой обычный массив строк, которые идут парами. Первая строка в паре должна быть равна
    "deny"
    или
    "allow"
    , вторая описывает одного клиента или группу клиентов (по имени или по адресу):

    require "drb/acl"

    acl = ACL.new( %w[ deny all

     allow 192.168.0.*

     allow 210.251.121.214

     allow localhost] )

    Первая пара в этом примере, строго говоря, излишня, но проясняет смысл всей конструкции.

    А как используются ACL? Метод

    install_acl
    приводит ACL в действие. Его необходимо вызывать перед обращением к
    start_service
    , иначе он не возымеет эффекта.

    # Продолжение примера...


    DRb.install_acl(acl)

    DRb.start_service(nil, some_object)

    # ...

    Теперь, после запуска сервиса любой неавторизованный запрос на соединение приведет к исключению

    RuntimeError
    .

    Это, конечно, не все, что можно сказать о библиотеке

    drb
    . Но для обзора вполне достаточно. В следующем разделе мы рассмотрим простой drb-сервер и drb-клиент, близкие к реальным программам. А затем поговорим о программах Rinda и Ring.

    20.2. Пример: эмуляция биржевой ленты

    В этом примере сервер публикует в сети биржевые котировки акций. К серверу может обратиться любой клиент, желающий узнать, сколько сейчас стоит его пакет.

    Но мы добавили одну тонкость. Не желая следить за малейшими колебаниями цен, мы реализовали модуль

    Observer
    , который позволяет подписаться на информационный канал. Клиент следит за поступающими сведениями и предупреждает нас, когда изменение цены превысит заданный порог.

    Сначала рассмотрим модуль

    DrbObservable
    . Это прямолинейная реализация паттерна
    Observer
    (Наблюдатель), описанного в замечательной книге Э. Гаммы, Р. Хелма, Р. Джонсона и Дж. Влиссидеса «Паттерны проектирования» (см. сноску в разделе 12.3.1). Еще этот паттерн называют «Издатель-Подписчик».

    В листинге 20.1 наблюдатель определен как объект, отвечающий на вызов метода update. Сервер добавляет наблюдателей по их просьбе и посылает им уведомления, обращаясь к методу

    notify_observers
    .

    Листинг 20.1. Модуль DrbObservable

    module DRbObservable


     def add_observer(observer)

      @observer_peers ||= []

      unless observer.respond_to? :update

       raise NameError, "наблюдатель должен отвечать на вызов 'update'"

      end

      @observer_peers.push observer

     end


     def delete_observer(observer)

      @observer_peers.delete observer if defined? @observer_peers

     end


     def notify_observers(*arg)

      return unless defined? @observer_peers

      for i in @observer_peers.dup

       begin

        i.update(*arg)

       rescue

        delete_observer(i)

       end

      end

     end


    end

    Сервер (он же канал) в листинге 20.2 эмулирует биржевые котировки с помощью последовательности псевдослучайных чисел (простите мою иронию, но это очень точно соответствует характеру рынка). Символ, идентифицирующий компанию, — всего лишь косметическое украшение, никакого реального смысла в этой программе он не имеет. При каждом изменении цены посылается уведомление всем наблюдателям.

    Листинг 20.2. Канал биржевых котировок (drb-сервер)

    require "drb"

    require "drb_pbserver"


    # Генерировать случайные котировки.

    class MockPrice


     MIN = 75

     RANGE = 50


     def initialize(symbol)

      @price = RANGE / 2

     end


     def price

      @price += (rand() - 0.5)*RANGE

      if @price < 0

       @price = -@price

      elsif @price >= RANGE

       @price = 2*RANGE - @price

      end

      MIN + @price

     end

    end


    class Ticker # Периодически получать котировку акций.

     include DRbObservable

     def initialize(price_feed)

      @feed = price_feed

      Thread.new { run }

     end


     def run

      lastPrice = nil

      loop do

       price = @feed.price

       print "Текущая котировка: #{price}\n"

       if price != lastPrice

        lastPrice = price

        notify_observers(Time.now, price)

       end

       sleep 1

      end

     end

    end


    ticker = Ticker.new(MockPrice.new("MSFT"))


    DRb.start_service('druby://localhost:9001', ticker)

    puts 'Нажмите [return] для завершения.'

    gets

    На платформе Windows примененный способ завершения программы вызывает сложности. Функция

    gets
    в этом случае может блокировать главный поток. Если вы это видите, попробуйте вместо обращения к
    gets
    поставить
    DRb.thread.join
    (а завершайте программу нажатием Ctrl+C).

    Неудивительно, что клиент (листинг 20.3) начинает с установления соединения с сервером. Он получает ссылку на объект показа котировок и устанавливает верхний и нижний пороги изменения цены. Затем клиент выводит сообщение пользователю всякий раз, как цена выходит за пределы указанного диапазона.

    Листинг 20.3. Наблюдатель биржевых котировок (drb-клиент)

    require "drb"


    class Warner

     include DRbUndumped


     def initialize(ticker, limit)

      @limit = limit

      ticker.add_observer(self) # Любой объект Warner

                                # является наблюдателем.

     end

    end


    class WarnLow < Warner

     def update(time, price)    # Обратный вызов наблюдателя.

      if price < @limit

       print "--- #{time.to_s}: Цена ниже #@limit: #{price}\n"

      end

     end

    end


    class WarnHigh < Warner

     def update(time, price)    # Обратный вызов наблюдателя.

      if price > @limit

       print "+++ #{time.to_s}: Цена выше #@limit: #{price}\n"

      end

     end

    end


    DRb.start_service

    ticker = DRbObject.new(nil, "druby://localhost:9001")


    WarnLow.new(ticker, 90)

    WarnHigh.new(ticker, 110)

    puts 'Нажмите [return] для завершения.'

    gets

    Модуль

    DRbUndumped
    (см. листинге 20.3) следует включать в любой объект, который не нужно подвергать маршалингу. Самого присутствия этого модуля в числе предков объекта достаточно, чтобы
    drb
    не пытался применять к нему маршалинг. Вот исходный текст этого модуля целиком:

    module DrbUndumped

    def _dump(dummy)

    raise TypeError, "can't dump"

    end

    end

    Приложение из этого раздела достаточно содержательно, и в то же время в нем легко разобраться. Есть и другие подходы к решению подобных задач. Но способ, показанный нами, демонстрирует простоту и элегантность распределенного Ruby.

    20.3. Rinda: пространство кортежей в Ruby

    Термин «пространство кортежей» появился в 1985 году, а сама идея еще старше. Кортежем называется массив или вектор, состоящий из элементов данных (как строка в таблице базы данных). Пространство кортежей — это большое объектное пространство, наполненное кортежами, нечто вроде «информационного супа».

    Пока реализация пространства кортежей кажется ничем не примечательной. Но все становится гораздо интереснее, стоит лишь осознать, что к нему могут обращаться многие клиенты и доступ должен синхронизироваться. Короче говоря, это распределенная сущность; любой клиент может читать из пространства кортежей или писать в него, то есть его можно рассматривать как большое распределенное хранилище или даже способ коммуникации.

    Первой реализацией пространства кортежей был проект Linda — исследование в области параллельного программирования, выполненное в Йельском университете в 1980-х годах. Реализация на языке Ruby (конечно, на основе библиотеки

    drb
    ), естественно, называется Rinda.

    Кортеж в Rinda может быть массивом или хэшем. На хэш налагается дополнительное ограничение: все ключи должны быть строками. Вот несколько примеров простых кортежей:

    t1 = [:add, 5, 9]

    t2 = [:name, :add_service, Adder.new, nil]

    t3 = { 'type' => 'add', 'value_1' => 5, 'value_2' => 9 }

    Элемент кортежа может быть произвольным объектом; это работает, потому что

    drb
    умеет выполнять маршалинг и демаршалинг объектов Ruby. (Конечно, необходимо либо включить модуль
    DRbUndumped
    , либо сделать определения объектов доступными серверу.)

    Пространство объектов создается методом

    new
    :

    require 'rinda/tuplespace'


    ts = Rinda::TupleSpace.new

    # ...

    Поэтому сервер выглядит так:

    require 'rinda/tuplespace'


    ts = Rinda::TupleSpace.new

    DRb.start_service("druby://somehost:9000", ts)

    gets # Нажать Enter для завершения сервера.

    А клиент — так:

    require 'rinda/tuplespace'


    DRb.start_service

    ts = DRbObject.new(nil, "druby://somehost:9000")

    # ...

    К пространству кортежей в Rinda применимы пять операций:

    read
    ,
    read_all
    ,
    write
    ,
    take
    и
    notify
    .

    Операция чтения

    read
    позволяет получить один кортеж. Но способ идентификации кортежа не вполне очевиден: необходимо задать кортеж, соответствующий искомому; при этом
    nil
    соответствует любому значению.

    t1 = ts.read [:Sum,nil] # Может извлечь, например, [:Sum, 14].

    Обычно операция

    read
    блокирует выполнение программы (для синхронизации). Чтобы быстро проверить существование кортежа, можно выполнить неблокирующее чтение, задав нулевой тайм-аут:

    t2 = ts.read [:Result,nil],0 # Возбуждает исключение, если кортеж

    # не существует.

    Если мы точно знаем или предполагаем, что образцу будет соответствовать не один, а несколько кортежей, можно воспользоваться методом

    read_all
    , который возвращает массив:

    tuples = ts.read_all [:Foo, nil, nil]

    tuples.each do |t|

    # ...

    end

    Метод

    read_all
    не принимает второго параметра. Он всегда блокирует программу, если не найдено ни одного подходящего кортежа.

    Операция

    take
    — это чтение, за которым следует удаление. Иными словами, метод
    take
    удаляет кортеж из пространства кортежей и возвращает его вызывающей программе:

    t = ts.take [:Sum, nil] # Кортежа больше нет в пространстве кортежей.

    Может возникнуть вопрос, почему не существует явного способа удаления. Надо полагать, что этой цели служит метод take.

    Метод

    write
    помещает кортеж в пространство кортежей. Второй параметр показывает, сколько секунд кортеж может существовать, прежде чем система сочтет, что срок его хранения истек. (По умолчанию его значение равно
    nil
    , то есть срок хранения не ограничен.)

    ts.write [:Add, 5, 9]      # Хранить "вечно".

    ts.write [:Foo, "Bar"], 10 # Хранить 10 секунд.

    Здесь уместно будет сказать несколько слов о синхронизации. Предположим, что два клиента пытаются одновременно забрать (

    take
    ) один и тот же кортеж. Одному это удастся, а другой будет заблокирован. Если первый клиент затем изменит кортеж и запишет (
    write
    ) его обратно в хранилище, то второй получит модифицированную версию. Можно считать, что операция «обновления» — это последовательность
    take
    и
    write
    , которая не приводит к потере данных. Конечно, как и при любом варианте многопоточного программирования, нужно позаботиться о том, чтобы не возникали тупиковые ситуации.

    Метод

    notify
    позволяет следить за пространством кортежей и получать уведомления, когда над интересующим вас кортежем была выполнена какая-то операция. Этот метод возвращает объект
    NotifyTemplateEntry
    и может наблюдать на операциями четырех видов:

    • 

    write
    ;

    • 

    take
    ;

    • удаление (когда истекает срок хранения кортежа);

    • закрытие (когда истекает срок хранения объекта

    NotifyTemplateEntry
    ).

    Поскольку операция чтения ничего не изменяет, то система не поддерживает уведомлений о чтениях. В листинге 20.4 приведен пример использования notify.

    Листинг 20.4. Уведомление в системе Rinda

    require 'rinda/tuplespace'


    ts = Rinda::TupleSpace.new


    alberts = ts.notify "write", ["Albert", nil]

    martins = ts.notify "take", ["Martin", nil]


    thr1 = Thread.new do

     alberts.each {|op,t| puts "#{op}: #{t.join(' ')}" }

    end


    thr2 = Thread.new do

     martins.each {|op,t| puts "#{op}: #{t.join(' ')}" }

    end


    sleep 1


    ts.write ["Martin", "Luther"]

    ts.write ["Albert", "Einstein"]

    ts.write ["Martin", "Fowler"]

    ts.write ["Alberf, "Schweitzer"]

    ts.write ["Martin", "Scorsese"]

    ts.take ["Martin", "Luther"]


    # Выводится:

    # write: Albert Einstein

    # write: Albert Schweitzer

    # take: Martin Luther

    Мы видели, что

    read
    и другие операции пользуются шаблонами для сопоставления с образцами (и этим напоминают регулярные выражения). Мы уже знаем, что
    nil
    выступает в роли метасимвола, но можно указать и класс; ему будет соответствовать любой экземпляр этого класса.

    tem1 = ["X", Integer]  # Соответствует ["X",5], но не ["X","Files"].

    tem2 = ["X", NilClass] # Соответствует литералу nil в кортеже.

    Кроме того, разрешается определять собственный оператор ветвящегося равенства (

    ===
    ), если вы хотите проводить сопоставление особым способом. В противном случае для сравнения будет использован стандартный оператор
    ===
    .

    Время жизни кортежа можно задать в момент записи. В сочетании с величинами тайм-аутов для различных операций над кортежами это позволяет ограничить время выполнения простых и более сложных манипуляций.

    Тот факт, что у кортежа может быть конечный срок хранения, заодно означает, что по истечении этого срока кортеж можно обновить с помощью специально написанного объекта. В библиотеке имеется готовый класс

    SimpleRenewer
    , который каждые 180 секунд обращается к drb-серверу, создавшему кортеж. Если сервер не отвечает, то кортеж удаляется. Но не пытайтесь программировать обновление, пока не освоитесь с парадигмой пространства кортежей.

    В листинге 20.5 приведен еще один пример работы с пространством кортежей. Он решает ту же задачу о производителе и потребителе, которая была рассмотрена в главе 13.

    Листинг 20.5. Задача о производителе и потребителе

    require 'rinda/tuplespace'


    ts = Rinda::TupleSpace.new


    producer = Thread.new do

     item = 0

     loop do

      sleep rand(0)

      puts "Производитель произвел ##{item}"

      ts.write ["Item",item]

      item += 1

     end

    end


    consumer = Thread.new do

     loop do

      sleep rand(0)

      tuple = ts.take ["Item", nil]

      word, item = tuple

      puts "Потребитель потребил ##{item}"

     end

    end


    sleep 60 # Работать одну минуту, потом завершиться и завершить потоки.

    20.4. Обнаружение сервисов в распределенном Ruby

    Методика обнаружения сервисов может оказаться полезной, когда имеется много локально работающих сервисов, поскольку дает возможность находить сервис по имени. Если же число сервисов невелико и их местонахождение точно известно, особого смысла в автоматическом обнаружении нет.

    Раз уж вы продолжили чтение, то, наверное, хотите знать, как работает механизм обнаружения сервисов. Такую возможность предоставляет библиотека

    Rinda::Ring
    (естественно, основанная на системе Rinda). В чем-то она похожа на службу DNS; это центральная служба регистрации, где хранится информация (в виде пространства кортежей) о
    drb
    -процессах. Сервисы
    drb
    могут по протоколу UDP найти ближайший сервер регистрации, объявить о своем присутствии или найти другие работающие поблизости сервисы.

    Класс

    Rinda::RingServer
    реализует сервер регистрации. Он управляет пространством имен, в котором хранится информация о местонахождении других drb-сервисов. RingServer прослушивает сеть в ожидании широковещательных UDP-пакетов с запросами о местонахождении сервера. В ответ на такой запрос он устанавливает соединение (посредством
    drb
    ) с отправившим его сервисом. Пример:

    require 'rinda/ring'

    require 'rinda/tuplespace'


    DRb.start_service

    Rinda::RingServer.new(Rinda::TupleSpace.new)


    DRb.thread.join

    Класс

    Rinda::RingProvider
    регистрирует сервис, объявляя о его присутствии серверу RingServer. При этом сообщается о типе сервиса и о фронтальном объекте, предоставляющем этот сервис, а также передается описательная информация. В следующем примере мы создаем простой сервис
    Adder
    , который складывает два числа, а потом объявляем о нем всему миру:

    require 'rinda/ring'


    class Adder

     include DRbUndumped

     def add(val1, val2)

      return val1 + val2

     end

    end


    adder = Adder.new

    DRb.start_service(nil, adder)

    Rinda::RingProvider.new(:adder, adder, 'Simple Adder')


    DRb.thread.join

    Класс

    Rinda::RingFinger
    (названный так по аналогии с имеющейся в UNIX командой
    finger
    ) применяется для обнаружения сервера RingServer. Он посылает широковещательный UDP-пакет и ждет ответа от сервера. Затем RingFinger можно использовать для поиска объявленных сервисов в пространстве кортежей.

    require 'rinda/ring'


    DRb.start_service

    rs = Rinda::RingFinger.primary

    list = [rs] + Rinda::Ringfinger.to_a

    svc = list.find_all [:name, :adder, nil, nil]

    20.5. Заключение

    Эта глава содержит введение в распределенный Ruby. Мы познакомились с тем, как сервис запускается и взаимодействует с клиентами, а также рассмотрели вопросы безопасности.

    Мы выяснили, что система Rinda может выступать в роли простого распределенного хранилища объектов, обеспечивающего синхронизацию доступа. Наконец, было показано, как можно использовать библиотеку

    Rinda::Ring
    для обнаружения drb-сервисов.

    На этом рассмотрение распределенного Ruby заканчивается. Переходим к следующей теме — инструментам разработки на языке Ruby, в частности программе Rake, оболочке

    irb
    , интегрированным средам разработки (IDE) и др.









    Главная | В избранное | Наш E-MAIL | Добавить материал | Нашёл ошибку | Наверх