|
||||
|
Глава 20. Распределенный Ruby
В настоящее время имеется немало технологий, поддерживающих распределенную обработку: различные варианты RPC, а также COM, CORBA, DCE и Java RMI. Одни проще, другие сложнее, но в принципе все делают одно и то же - предоставляют относительно прозрачный способ связи между находящимися в сети объектами так, чтобы с удаленными объектами можно было работать, как с локальными. Зачем это вообще может понадобиться? Причин много. Например, чтобы распределить некоторую вычислительную задачу между многими процессорами. Примером может послужить программа SETI@home, которая использует ваш ПК для обработки небольших объемов данных в поисках внеземного разума (кстати, эта программа не является проектом института SETI). Другой пример — привлечение широких масс к взлому шифра RSA129 (эта попытка увенчалась успехом несколько лет назад). Существует очень много задач, которые можно разбить на небольшие части, пригодные для распределенного решения. Можно также представить себе, что вы хотите предоставить интерфейс к некоему сервису, не раскрывая исходных текстов. Часто это делается с помощью Web-приложений, но из-за отсутствия состояния в протоколе HTTP это не всегда удобно (есть и другие недостатки). Механизм распределенного программирования позволяет решать подобные задачи более естественно. В мире Ruby ответом на этот вызов стала программа drb, написанная Масатоси Секи (Masatoshi Seki); еще ее название записывают так: DRb. Существуют и другие способы распределенной обработки на Ruby, но drb, пожалуй, самый легкий. Здесь нет сложных служб разрешения имен, как в CORBA. Это всего лишь простая и удобная библиотека, предоставляющая всю необходимую функциональность. В данной главе мы рассмотрим основы работы как с ней самой, так и с надстроенной над ней системой Rinda. 20.1. Обзор: библиотека drbБиблиотека drbсостоит из двух основных частей: серверной и клиентской. Грубую границу между ними можно провести следующим образом: Сервер: • запускает TCPServer и начинает прослушивать порт; • привязывает объект к экземпляру сервера drb; • принимает запросы на соединение от клиентов и отвечает на их сообщения; • дополнительно может предоставлять контроль доступа (безопасность). Клиент: • устанавливает соединение с сервером; • привязывает локальный объект к удаленному экземпляру сервера; • посылает сообщения серверу и получает ответы. Метод класса start_serviceотвечает за запуск TCP-сервера, прослушивающего указанный порт. Он принимает два параметра: URI (универсальный идентификатор ресурса), задающий порт (если он равен nil, то порт выбирается динамически), и объект, к которому мы хотим привязаться. Этот объект будет доступен удаленному клиенту, который сможет вызывать его методы, как если бы объект был локальным. require "drb" myobj = MyServer.new DRb.start_service("druby://:1234", myobj) # Порт 1234. # ... Если порт выбирается динамически, то для получения полного URI, включающего и номер порта, можно воспользоваться методом класса uri. DRb.start_service(nil, myobj) myURI = DRb.uri # "druby://hal9000:2001" Поскольку drb—многопоточная программа, любое серверное приложение должно выполнять joinв потоке сервера (чтобы не дать приложению завершиться преждевременно и тем самым уничтожить выполняющийся поток). # Предотвратить преждевременный выход. DRb.thread.join На стороне клиента мы вызываем метод start_serviceбез параметров и с помощью класса DRbObjectсоздаем локальный объект, соответствующий удаленному. Обычно первым параметром методу DRbObject.newпередается nil. require "drb" DRb.start_service obj = DRbObject.new(nil, "druby://hal9000:2001") # Сообщения, передаваемые obj, перенаправляются # удаленному объекту на стороне сервера... Следует подчеркнуть, что на стороне сервера привязка осуществляется к единственному объекту, который должен отвечать на все получаемые запросы. Если клиентов несколько, то объект должен быть безопасным относительно потоков, чтобы не оказаться в некорректном состоянии. (Для совсем простых или узкоспециализированных приложений это может быть и необязательно.) Мы не можем вдаваться в технические детали. Но имейте в виду, что если клиент читает или изменяет внутреннее состояние удаленного объекта, то при наличии нескольких клиентов возможна интерференция. Во избежание таких неприятностей мы рекомендуем применять механизмы синхронизации, например класс Mutex. (Подробнее о потоках и синхронизации рассказывается в главе 13.) Скажем хотя бы несколько слов о безопасности. Ведь не всегда желательно, чтобы с вашим сервером мог соединяться кто угодно. Помешать им пытаться вы не можете, зато можете сделать такие попытки безуспешными. В программе drbесть понятие списка контроля доступа (ACL). Это не что иное, как списки клиентов (или категорий клиентов), которым явно разрешен (или запрещен) доступ. Приведем пример. Для создания нового списка ACL мы воспользуемся классом ACL, которому передадим один или два параметра. Второй (необязательный) параметр метода ACL.newслужит для ответа на вопрос: «Мы запрещаем доступ всем клиентам, кроме некоторых, или, наоборот, разрешаем доступ всем клиентам, кроме некоторых?» По умолчанию принимается первый вариант, который обозначается константой DENY_ALLOWравной 0. Второй режим обозначается ALLOW_DENYравной 1. Первый параметр ACL.newпредставляет собой обычный массив строк, которые идут парами. Первая строка в паре должна быть равна "deny"или "allow", вторая описывает одного клиента или группу клиентов (по имени или по адресу): require "drb/acl" acl = ACL.new( %w[ deny all allow 192.168.0.* allow 210.251.121.214 allow localhost] ) Первая пара в этом примере, строго говоря, излишня, но проясняет смысл всей конструкции. А как используются ACL? Метод install_aclприводит ACL в действие. Его необходимо вызывать перед обращением к start_service, иначе он не возымеет эффекта. # Продолжение примера... DRb.install_acl(acl) DRb.start_service(nil, some_object) # ... Теперь, после запуска сервиса любой неавторизованный запрос на соединение приведет к исключению RuntimeError. Это, конечно, не все, что можно сказать о библиотеке drb. Но для обзора вполне достаточно. В следующем разделе мы рассмотрим простой drb-сервер и drb-клиент, близкие к реальным программам. А затем поговорим о программах Rinda и Ring. 20.2. Пример: эмуляция биржевой лентыВ этом примере сервер публикует в сети биржевые котировки акций. К серверу может обратиться любой клиент, желающий узнать, сколько сейчас стоит его пакет. Но мы добавили одну тонкость. Не желая следить за малейшими колебаниями цен, мы реализовали модуль Observer, который позволяет подписаться на информационный канал. Клиент следит за поступающими сведениями и предупреждает нас, когда изменение цены превысит заданный порог. Сначала рассмотрим модуль DrbObservable. Это прямолинейная реализация паттерна Observer(Наблюдатель), описанного в замечательной книге Э. Гаммы, Р. Хелма, Р. Джонсона и Дж. Влиссидеса «Паттерны проектирования» (см. сноску в разделе 12.3.1). Еще этот паттерн называют «Издатель-Подписчик». В листинге 20.1 наблюдатель определен как объект, отвечающий на вызов метода update. Сервер добавляет наблюдателей по их просьбе и посылает им уведомления, обращаясь к методу notify_observers. Листинг 20.1. Модуль DrbObservable module DRbObservable def add_observer(observer) @observer_peers ||= [] unless observer.respond_to? :update raise NameError, "наблюдатель должен отвечать на вызов 'update'" end @observer_peers.push observer end def delete_observer(observer) @observer_peers.delete observer if defined? @observer_peers end def notify_observers(*arg) return unless defined? @observer_peers for i in @observer_peers.dup begin i.update(*arg) rescue delete_observer(i) end end end end Сервер (он же канал) в листинге 20.2 эмулирует биржевые котировки с помощью последовательности псевдослучайных чисел (простите мою иронию, но это очень точно соответствует характеру рынка). Символ, идентифицирующий компанию, — всего лишь косметическое украшение, никакого реального смысла в этой программе он не имеет. При каждом изменении цены посылается уведомление всем наблюдателям. Листинг 20.2. Канал биржевых котировок (drb-сервер)require "drb" require "drb_pbserver" # Генерировать случайные котировки. class MockPrice MIN = 75 RANGE = 50 def initialize(symbol) @price = RANGE / 2 end def price @price += (rand() - 0.5)*RANGE if @price < 0 @price = -@price elsif @price >= RANGE @price = 2*RANGE - @price end MIN + @price end end class Ticker # Периодически получать котировку акций. include DRbObservable def initialize(price_feed) @feed = price_feed Thread.new { run } end def run lastPrice = nil loop do price = @feed.price print "Текущая котировка: #{price}\n" if price != lastPrice lastPrice = price notify_observers(Time.now, price) end sleep 1 end end end ticker = Ticker.new(MockPrice.new("MSFT")) DRb.start_service('druby://localhost:9001', ticker) puts 'Нажмите [return] для завершения.' gets На платформе Windows примененный способ завершения программы вызывает сложности. Функция getsв этом случае может блокировать главный поток. Если вы это видите, попробуйте вместо обращения к getsпоставить DRb.thread.join(а завершайте программу нажатием Ctrl+C). Неудивительно, что клиент (листинг 20.3) начинает с установления соединения с сервером. Он получает ссылку на объект показа котировок и устанавливает верхний и нижний пороги изменения цены. Затем клиент выводит сообщение пользователю всякий раз, как цена выходит за пределы указанного диапазона. Листинг 20.3. Наблюдатель биржевых котировок (drb-клиент)require "drb" class Warner include DRbUndumped def initialize(ticker, limit) @limit = limit ticker.add_observer(self) # Любой объект Warner # является наблюдателем. end end class WarnLow < Warner def update(time, price) # Обратный вызов наблюдателя. if price < @limit print "--- #{time.to_s}: Цена ниже #@limit: #{price}\n" end end end class WarnHigh < Warner def update(time, price) # Обратный вызов наблюдателя. if price > @limit print "+++ #{time.to_s}: Цена выше #@limit: #{price}\n" end end end DRb.start_service ticker = DRbObject.new(nil, "druby://localhost:9001") WarnLow.new(ticker, 90) WarnHigh.new(ticker, 110) puts 'Нажмите [return] для завершения.' gets Модуль DRbUndumped(см. листинге 20.3) следует включать в любой объект, который не нужно подвергать маршалингу. Самого присутствия этого модуля в числе предков объекта достаточно, чтобы drbне пытался применять к нему маршалинг. Вот исходный текст этого модуля целиком: module DrbUndumped def _dump(dummy) raise TypeError, "can't dump" end end Приложение из этого раздела достаточно содержательно, и в то же время в нем легко разобраться. Есть и другие подходы к решению подобных задач. Но способ, показанный нами, демонстрирует простоту и элегантность распределенного Ruby. 20.3. Rinda: пространство кортежей в RubyТермин «пространство кортежей» появился в 1985 году, а сама идея еще старше. Кортежем называется массив или вектор, состоящий из элементов данных (как строка в таблице базы данных). Пространство кортежей — это большое объектное пространство, наполненное кортежами, нечто вроде «информационного супа». Пока реализация пространства кортежей кажется ничем не примечательной. Но все становится гораздо интереснее, стоит лишь осознать, что к нему могут обращаться многие клиенты и доступ должен синхронизироваться. Короче говоря, это распределенная сущность; любой клиент может читать из пространства кортежей или писать в него, то есть его можно рассматривать как большое распределенное хранилище или даже способ коммуникации. Первой реализацией пространства кортежей был проект Linda — исследование в области параллельного программирования, выполненное в Йельском университете в 1980-х годах. Реализация на языке Ruby (конечно, на основе библиотеки drb), естественно, называется Rinda. Кортеж в Rinda может быть массивом или хэшем. На хэш налагается дополнительное ограничение: все ключи должны быть строками. Вот несколько примеров простых кортежей: t1 = [:add, 5, 9] t2 = [:name, :add_service, Adder.new, nil] t3 = { 'type' => 'add', 'value_1' => 5, 'value_2' => 9 } Элемент кортежа может быть произвольным объектом; это работает, потому что drbумеет выполнять маршалинг и демаршалинг объектов Ruby. (Конечно, необходимо либо включить модуль DRbUndumped, либо сделать определения объектов доступными серверу.) Пространство объектов создается методом new: require 'rinda/tuplespace' ts = Rinda::TupleSpace.new # ... Поэтому сервер выглядит так: require 'rinda/tuplespace' ts = Rinda::TupleSpace.new DRb.start_service("druby://somehost:9000", ts) gets # Нажать Enter для завершения сервера. А клиент — так: require 'rinda/tuplespace' DRb.start_service ts = DRbObject.new(nil, "druby://somehost:9000") # ... К пространству кортежей в Rinda применимы пять операций: read, read_all, write, takeи notify. Операция чтения readпозволяет получить один кортеж. Но способ идентификации кортежа не вполне очевиден: необходимо задать кортеж, соответствующий искомому; при этом nilсоответствует любому значению. t1 = ts.read [:Sum,nil] # Может извлечь, например, [:Sum, 14]. Обычно операция readблокирует выполнение программы (для синхронизации). Чтобы быстро проверить существование кортежа, можно выполнить неблокирующее чтение, задав нулевой тайм-аут: t2 = ts.read [:Result,nil],0 # Возбуждает исключение, если кортеж # не существует. Если мы точно знаем или предполагаем, что образцу будет соответствовать не один, а несколько кортежей, можно воспользоваться методом read_all, который возвращает массив: tuples = ts.read_all [:Foo, nil, nil] tuples.each do |t| # ... end Метод read_allне принимает второго параметра. Он всегда блокирует программу, если не найдено ни одного подходящего кортежа. Операция take— это чтение, за которым следует удаление. Иными словами, метод takeудаляет кортеж из пространства кортежей и возвращает его вызывающей программе: t = ts.take [:Sum, nil] # Кортежа больше нет в пространстве кортежей. Может возникнуть вопрос, почему не существует явного способа удаления. Надо полагать, что этой цели служит метод take. Метод writeпомещает кортеж в пространство кортежей. Второй параметр показывает, сколько секунд кортеж может существовать, прежде чем система сочтет, что срок его хранения истек. (По умолчанию его значение равно nil, то есть срок хранения не ограничен.) ts.write [:Add, 5, 9] # Хранить "вечно". ts.write [:Foo, "Bar"], 10 # Хранить 10 секунд. Здесь уместно будет сказать несколько слов о синхронизации. Предположим, что два клиента пытаются одновременно забрать ( take) один и тот же кортеж. Одному это удастся, а другой будет заблокирован. Если первый клиент затем изменит кортеж и запишет ( write) его обратно в хранилище, то второй получит модифицированную версию. Можно считать, что операция «обновления» — это последовательность takeи write, которая не приводит к потере данных. Конечно, как и при любом варианте многопоточного программирования, нужно позаботиться о том, чтобы не возникали тупиковые ситуации. Метод notifyпозволяет следить за пространством кортежей и получать уведомления, когда над интересующим вас кортежем была выполнена какая-то операция. Этот метод возвращает объект NotifyTemplateEntryи может наблюдать на операциями четырех видов: • write; • take; • удаление (когда истекает срок хранения кортежа); • закрытие (когда истекает срок хранения объекта NotifyTemplateEntry). Поскольку операция чтения ничего не изменяет, то система не поддерживает уведомлений о чтениях. В листинге 20.4 приведен пример использования notify. Листинг 20.4. Уведомление в системе Rindarequire 'rinda/tuplespace' ts = Rinda::TupleSpace.new alberts = ts.notify "write", ["Albert", nil] martins = ts.notify "take", ["Martin", nil] thr1 = Thread.new do alberts.each {|op,t| puts "#{op}: #{t.join(' ')}" } end thr2 = Thread.new do martins.each {|op,t| puts "#{op}: #{t.join(' ')}" } end sleep 1 ts.write ["Martin", "Luther"] ts.write ["Albert", "Einstein"] ts.write ["Martin", "Fowler"] ts.write ["Alberf, "Schweitzer"] ts.write ["Martin", "Scorsese"] ts.take ["Martin", "Luther"] # Выводится: # write: Albert Einstein # write: Albert Schweitzer # take: Martin Luther Мы видели, что readи другие операции пользуются шаблонами для сопоставления с образцами (и этим напоминают регулярные выражения). Мы уже знаем, что nilвыступает в роли метасимвола, но можно указать и класс; ему будет соответствовать любой экземпляр этого класса. tem1 = ["X", Integer] # Соответствует ["X",5], но не ["X","Files"]. tem2 = ["X", NilClass] # Соответствует литералу nil в кортеже. Кроме того, разрешается определять собственный оператор ветвящегося равенства ( ===), если вы хотите проводить сопоставление особым способом. В противном случае для сравнения будет использован стандартный оператор ===. Время жизни кортежа можно задать в момент записи. В сочетании с величинами тайм-аутов для различных операций над кортежами это позволяет ограничить время выполнения простых и более сложных манипуляций. Тот факт, что у кортежа может быть конечный срок хранения, заодно означает, что по истечении этого срока кортеж можно обновить с помощью специально написанного объекта. В библиотеке имеется готовый класс SimpleRenewer, который каждые 180 секунд обращается к drb-серверу, создавшему кортеж. Если сервер не отвечает, то кортеж удаляется. Но не пытайтесь программировать обновление, пока не освоитесь с парадигмой пространства кортежей. В листинге 20.5 приведен еще один пример работы с пространством кортежей. Он решает ту же задачу о производителе и потребителе, которая была рассмотрена в главе 13. Листинг 20.5. Задача о производителе и потребителеrequire 'rinda/tuplespace' ts = Rinda::TupleSpace.new producer = Thread.new do item = 0 loop do sleep rand(0) puts "Производитель произвел ##{item}" ts.write ["Item",item] item += 1 end end consumer = Thread.new do loop do sleep rand(0) tuple = ts.take ["Item", nil] word, item = tuple puts "Потребитель потребил ##{item}" end end sleep 60 # Работать одну минуту, потом завершиться и завершить потоки. 20.4. Обнаружение сервисов в распределенном RubyМетодика обнаружения сервисов может оказаться полезной, когда имеется много локально работающих сервисов, поскольку дает возможность находить сервис по имени. Если же число сервисов невелико и их местонахождение точно известно, особого смысла в автоматическом обнаружении нет. Раз уж вы продолжили чтение, то, наверное, хотите знать, как работает механизм обнаружения сервисов. Такую возможность предоставляет библиотека Rinda::Ring(естественно, основанная на системе Rinda). В чем-то она похожа на службу DNS; это центральная служба регистрации, где хранится информация (в виде пространства кортежей) о drb-процессах. Сервисы drbмогут по протоколу UDP найти ближайший сервер регистрации, объявить о своем присутствии или найти другие работающие поблизости сервисы. Класс Rinda::RingServerреализует сервер регистрации. Он управляет пространством имен, в котором хранится информация о местонахождении других drb-сервисов. RingServer прослушивает сеть в ожидании широковещательных UDP-пакетов с запросами о местонахождении сервера. В ответ на такой запрос он устанавливает соединение (посредством drb) с отправившим его сервисом. Пример: require 'rinda/ring' require 'rinda/tuplespace' DRb.start_service Rinda::RingServer.new(Rinda::TupleSpace.new) DRb.thread.join Класс Rinda::RingProviderрегистрирует сервис, объявляя о его присутствии серверу RingServer. При этом сообщается о типе сервиса и о фронтальном объекте, предоставляющем этот сервис, а также передается описательная информация. В следующем примере мы создаем простой сервис Adder, который складывает два числа, а потом объявляем о нем всему миру: require 'rinda/ring' class Adder include DRbUndumped def add(val1, val2) return val1 + val2 end end adder = Adder.new DRb.start_service(nil, adder) Rinda::RingProvider.new(:adder, adder, 'Simple Adder') DRb.thread.join Класс Rinda::RingFinger(названный так по аналогии с имеющейся в UNIX командой finger) применяется для обнаружения сервера RingServer. Он посылает широковещательный UDP-пакет и ждет ответа от сервера. Затем RingFinger можно использовать для поиска объявленных сервисов в пространстве кортежей. require 'rinda/ring' DRb.start_service rs = Rinda::RingFinger.primary list = [rs] + Rinda::Ringfinger.to_a svc = list.find_all [:name, :adder, nil, nil] 20.5. ЗаключениеЭта глава содержит введение в распределенный Ruby. Мы познакомились с тем, как сервис запускается и взаимодействует с клиентами, а также рассмотрели вопросы безопасности. Мы выяснили, что система Rinda может выступать в роли простого распределенного хранилища объектов, обеспечивающего синхронизацию доступа. Наконец, было показано, как можно использовать библиотеку Rinda::Ringдля обнаружения drb-сервисов. На этом рассмотрение распределенного Ruby заканчивается. Переходим к следующей теме — инструментам разработки на языке Ruby, в частности программе Rake, оболочке irb, интегрированным средам разработки (IDE) и др. |
|
||
Главная | В избранное | Наш E-MAIL | Добавить материал | Нашёл ошибку | Наверх |
||||
|