dev @ purchease

comment nous codons chez purchease

..

Herr Kappelmeister

Publié par david le 25/09/2020 - architecture

Intro

L’extraction de tickets, de l’analyse d’image jusqu’au répérage des articles et des points requiert une multitudes d’étapes que nous réalisons en tâche de fond. Selon le contexte ( application d’origine, qualité du ticket, fraude éventuelle) le flow de traitement peut avoir plus ou moins d’étapes. Afin de les orchestrer nous avons développé un moyen de controller l’ensemble de ces tâches afin que toutes les applications en jeu puissent jouer ensemble la bonne partition.

RESQUE + WORKFLOW = ASYNC_WORKFLOW

L’importance du contrôle des tâches asynchrones a demandé beaucoup d’effort de design et développement que nous allons tenter de répliquer ici pas à pas.

Notre cas d’exemple

Supposons que nous avons deux applications complexes, que nous réduirons pour l’exemple à peu de code : la première donne le bulletin météo, la seconde vérifie un agenda. Notre objectif est de donner plannifier notre journée en fonction de ces deux applications.

Weather expert

Voici le code applicatif, qui nous donne le bulletin météo

# weather_export_app/wheather_export.rb

require 'date'
class CheckWeatherReport
  def self.seasons(y)
    {
      spring:  [ "#{y}-03-21",  "#{y}-06-21" ].map{|ts| Date.parse(ts) },
      summer:  [ "#{y}-06-21",  "#{y}-09-21" ].map{|ts| Date.parse(ts) },
      fall:  [ "#{y}-09-21",  "#{y}-12-21" ].map{|ts| Date.parse(ts) },
      winter:  [ "#{y}-12-21",  "#{y}-03-21" ].map{|ts| Date.parse(ts) }
    }
  end

  def self.season_report_mapping
    {
      spring:  {rain: rand > 0.3, temperature: (15 + (5 * rand - 3 )).round , snow: rand > 0.8 },
      summer:  {rain: rand > 0.8, temperature: (25 + (5 * rand - 3 )).round , snow: rand > 1 },
      fall:    {rain: rand > 0.2, temperature: (12 + (5 * rand - 3 )).round , snow: rand > 1 },
      winter:  {rain: rand > 0.9, temperature: (5 + (5 * rand - 3 )).round , snow: rand > 0.2 }
    }
  end

  def self.perform str_date
    date = Date.parse(str_date)
    y = date.strftime('%Y')
    season  = seasons(y).find{|seasons,range| (range.first..range.last).include?(date) }.first
    result = season_report_mapping[season] 
    puts "[CheckWeatherReport] #{result}"
    return result
  end
end

Nous décidons dans un premier temps d’exposer ce service sur un api avec Rack :

# weather_export_app/web_server.ru

require 'rack/app'
require_relative 'weather_expert.rb'

class WeatherExpert < Rack::App
  get 'report' do
    CheckWeatherReport.perform params['date_as_str']
  end
end

run WeatherExpert

Enfin, un script minimal pour lancer le serveur :

#!/bin/sh
rackup -p 3001 ./web_server.ru

On teste notre application :

 > curl localhost:3001/report?date_as_str=2019-04-02
 {:rain=>true, :temperature=>12., :snow=>false}

Notre service est en pleine forme !

Agenda

Construisons une seconde application ‘experte’ : celle-ci expose un service permettant de vérifier pour chaque heure de la semaine, mon activité :

# agenda_app/agenda.rb
class CheckAgenda
  def self.week_day_schedule
    {
      (7..9) => :kids,
      (9..18) => :work,
      (18..21) => :kids,
      (22..24)    => :sleep,
      (0..7)    => :sleep
    }
  end

  def self.weekend_day_schedule
    {
      (9..11) => :kids,
      (11..16) => :free,
      (16..19)  => :kids,
      (19..24) => :free,
      (0..9) => :sleep,
    }
  end

  def self.day_type ts 
    case ts.wday 
    when (1..5)
      :week_day
    when (6..7)
      :weekend_day
    end
  end

  def self.activity hour, schedule
    schedule.find{|k,v| k.include?(hour) }.last
  end

  def self.perform datetime_as_str
    ts = Time.strptime datetime_as_str, '%Y-%m-%d_%H_%S'
    meth = "#{day_type(ts)}_schedule"
    schedule = send(meth.to_sym)
    result = activity(ts.hour, schedule)
    puts "[CheckAgenda] : #{result}"
    result
  end
end

On créera de la même façon un serveur :

require 'rack/app'
require_relative 'agenda.rb'

class Agenda < Rack::App
  get 'activity' do
    CheckAgenda.perform params['datetime_as_str']
  end
end
run Agenda

Et de quoi le démarrer

#!/bin/sh
rackup -p 3002 ./web_server.ru

Utilisons les !

Notre cas d’usage est le suivant : on a un modèle d’utilisateur pouvant réaliser un certain nombre d’actions :

# scheduler.rb
require 'net/http'
# given a time stamp, what should I do ? 
# I check the wheather, my agenda, and take my decision

class User 
  def run
    puts 'let s run'
  end

  def code_for_fun
    puts 'let s code_for_fun'
  end

  def sleep
    puts 'let s sleep'
  end

  def take_care_of_kids
    puts 'lets  take care of the kids'
  end

  def work 
    puts 'lets work'
  end
end

Et un arbre de décision qui s’appuie sur les services météo et agenda pour décider de l’action :

class Scheduler
  def self.select_activity time
    ts = time.strftime("%Y-%m-%d_%H_%S")
    activity_reply = JSON.parse(Net::HTTP.get('localhost', "/activity?datetime_as_str=#{ts}",3002))
    puts activity_reply
    case activity_reply['task']
    when 'kids'
      :take_care_of_kids
    when 'sleep'
      :sleep
    when 'work'
      :work
    when  'free'
      weather_reply = JSON.parse(Net::HTTP.get('localhost', "/report?datetime_as_str=#{ts}",3001))
      puts weather_reply

      if weather_reply['rain'] || weather_reply['snow']
        :code_for_fun
      else
        :run
      end
    end
  end
  def self.activate user, time=Time.now
    activity = select_activity time
    user.send activity
  end
end

Scheduler.activate User.new
# {"task"=>"free"}
# {"rain"=>true, "temperature"=>10, "snow"=>false}
# let s code_for_fun

Les problèmes arrivent

Bien, le problème est posé “et la question est vide répondu”. Maintenant, que ce passe-t-il si la question n’est pas “vite répondue” ? Imaginons que notre service météo s’appuie sur un modèle de forecast extremement complexe, qu’il doive récupérer des images satellites au lieu de faire une recherche dans un hash. Imaginons que notre Agenda consulte réellement un agenda, essaie de contacter les assistants personnels virtuels de nos rendez-vous ou qu’il faille vérifier les horaires de la baby-sitter par sms ? Notre décision ne peut plus être rendue ‘live’.

“Resquousse” : Resque à notre rescousse

Première chose, on va sortir nos jobs trop lourds du chemin d’exécution synchrone sur l’api. Utilisons Resque pour faire en sorte que nos jobs s’exécutent en tâche de fond. Définissons, pour chaque app le process de jobs asynchrones. Pour cela, nous allons avoir besoin d’un Rakefile, invoquant resque en lui précisant notre code applicatif.

# weather_expert_app/Rakefile
require 'resque/tasks'
namespace :resque do
  task :setup do
    require_relative './weather_expert.rb'
  end
end

La classe exécute le code applicatif n’est que très légèrement altérée : on précise une file d’attente sur laquelle les jobs de l’app vont être placés et sur laquelle le worker va écouter. Ca sera le seul changement puisque la class avait été écrite dans l’optique d’être exécutable en tant que job Resque avec son self.perform.

# weather_export_app/wheather_export.rb

require 'time'

class CheckWeatherReport
  @queue=:weather_background
#...
end

Coté serveur web, on ne va pas changer grand chose non plus :

# weather_export_app/web_server.rb
require 'rack/app'
require_relative 'weather_expert.rb'
require 'json'
require 'resque'

class WeatherExpert < Rack::App
  get 'report' do
    #reply  = CheckWeatherReport.perform params['datetime_as_str']
    reply = Resque.enqueue CheckWeatherReport, params['datetime_as_str']
    reply.to_json
  end

end
run WeatherExpert

Sur le service, au lieu d’appeler la classe qui exécute le job, on met le job en file d’attente.

Enfin, il nous faut lancer le worker Resque en plus du serveur : on lance en background la tâche rake correspondant au service de Resque, en lui précisant la queue sur laquelle écouter :

#!/bin/sh
cd "$(dirname "$0")"

QUEUE=weather_background rake resque:work &
rackup -p 3001 ./web_server.ru
trap 'kill $(jobs -p)' EXIT

On transformera l’application agenda de la même façon.

Si on relance notre application et qu’on l’appelle :

weather_reply = JSON.parse(Net::HTTP.get('localhost', "/report?datetime_as_str=#{ts}",3001))
> true

La réponse que l’on obtient est le résultat de la mise en file d’attente du job. Si on regarde les logs du worker de l’application, on verra passer :

[CheckWeatherReport] {:rain=>false, :temperature=>14, :snow=>false}

La tâche a bien été exécutée par le worker asynchrone !

Une solution encore bancale

Très bien nos tâches peuvent durer des heures, l’api des applications n’en souffrira pas. Mais, comment j’obtiens mon résultat maintenant que le travail est fait en tâche de fond ? Et comment appliquer ma logique qui est conditionnée par les deux tâches ? On peut faire en sorte que les tâches rappelent mon applicatif principal lorsqu’elles sont exécutées. On pourra alors appeler la tâche suivante. Mais on pourrait bien avoir à réappliquer souvent cette logique. Tentons d’écrire un middleware qui constituerait une couche d’abstraction gérant la communication afin de se consacrer sur la logique elle-même.

Ce que l’on veut

final_design

Explications :

Le traitement que l’on souhaite effectuer requiert l’exécution de tâches complexes qui peuvent avoir lieu sur l’application 1 et 2. Ces applications disposent toutes deux de leur DB applicative et d’une DB redis gérant leur tâches asynchrone.

On va définir l’enchainement des tâches dans l’application ‘KappelMeister’ qui veillera à ce que l’ordonnancement que l’on a planifié se déroule tel qu’on le décide. Pour cela, il doit être notifié de la fin des tâches pour pouvoir passer à la suite, ce qui est le point clé dans l’ordonnancement des tâches en background.

Nous souhaitons que le code soit partagé, afin que le protocole propre à cette tuyauterie soit distribué facilement entre les acteurs.

La partie abstraite

Tentons de construire le protocole à distribuer sur nos trois apps. Comme le flow de traitement peut être compliqué, utilisons un automate fini. AASM est super bien, mais nous allons choisir celle issue de la gem ‘workflow’. L’idée est la suivante : l’automate est joué sur l’application KapellMeister. Quand il fait un pas, il execute la tâche correspondante, en passant l’ordre à l’application cible. Elle persiste son état en attendant d’être réveillée. Quand l’application cible a terminé, elle doit notifier l’application KapellMeister pour qu’elle poursuive l’automate. Ecrivons le début de la couche abstraite. Nous aurons besoin d’une définition d’un workflow, d’un controlleur qui les gérera, et de la définition des messages. Enfin nous aurons besoin de définir les ‘Agents’ : ce sont les tâches qui seront executées par les applications clientes.

Commençons par créer une classe de controller qui se chargera de gérér nos workflows. Elle doit pouvoir :


Définissons maintenant le protocole d'echange entre agents et chef d'ochestre. On va pour cela mettre dans une classe de message les actions et paramètres permettant la réalisation des cas d'usage listés plus haut
```ruby

class WorkflowPayload
    attr_accessor :type, :agent,  :agent_args, :callback, :reply, :wk_key, :creation_params, :wf_klass

    def initialize h
      @type = h['type']
      @agent = h['agent']
      @agent_args = h['agent_args']
      @callback =  h['callback']
      @reply =  h['reply']
      @wk_key = h['wk_key']
      @wf_klass = h['wf_klass']
      @creation_params= h['creation_params']
    end


    def self.build_for_post_creation wf_name, params
      {
        type: 'post_create',
        wf_klass: wf_name,
        creation_params: params
      }
    end


    def self.build_for_creation wf_name, params
      {
        type: 'create',
        wf_klass: wf_name,
        creation_params: params
      }
    end

    def self.build_for_agent_call wk_key, agent_name, agent_args, callback
        pl = new({})
        pl.type = 'agent_call'
        pl.agent = agent_name
        pl.agent_args = agent_args
        pl.wk_key = wk_key
        pl.callback = callback
        pl
    end

    def self.build_for_agent_reply wk_key, callback, reply
        pl = new( {})
        pl.type = 'agent_reply'
        pl.callback = callback
        pl.reply = reply
        pl.wk_key = wk_key
        pl
    end

    def to_h
      {
        type: @type,
        agent: @agent,
        agent_args: @agent_args, 
        callback: @callback,
        reply: @reply,
        wk_key: @wk_key,
        wf_klass: @wf_klass,
        creation_params: @creation_params
      }
    end

end

Il nous faudra également la class abstraite de workflow :

class BaseWorkflow 
  include Workflow
  attr_accessor :id

  def initialize params
  end

  def key
    "#{self.class.name}::#{id}"
  end

  def self.decode_key wf_key
    wf_key.split('::')
  end

  def to_h
    hash_data = {}
    instance_variables.each do |var|
      tuned_var = var.to_s.sub(/^@*/, '')
      hash_data[tuned_var] = instance_variable_get var
    end
    hash_data
  end

  def self.from_hash(hash_data)
    instance = new({})
    hash_data.each do |var, val|
      instance.instance_variable_set "@#{var}", val
    end
    instance
  end

  def self.from_json(string_data)
    from_hash(JSON.parse(string_data))
  end

  def self.register_agent agent_name, opts
    WorkflowController.register_agent agent_name, opts
  end

  def call_agent agent_name, agent_args, call_back
    WorkflowController.call_agent self,  agent_name, agent_args, call_back
  end
end

Et enfin, nous allons définir une class de Worker Resque génrérique qui sera capable d’agir en fonction du message recu :




class WorkflowWorker

    def self.perform pl_message
      begin 
        a_log "dequeued #{pl_message}"
        message = WorkflowPayload.new pl_message
        case message.type
        when 'post_create'
          WorkflowController.post_message message
        when 'create'
          wf = WorkflowController.create_workflow message.wf_klass, message.creation_params
        when 'agent_call'
          klass = Module.const_get message.agent
          result = klass.perform message.agent_args
          pl_reply = WorkflowPayload.build_for_agent_reply  message.wf_key, message.callback, result
          WorkflowController.post_message pl_reply
        when 'agent_reply'
          wf = WorkflowController.update_workflow_with_reply message
        end
      rescue Exception => e
        a_log e.message
        puts e.backtrace
        raise e
      end

    end
end

L’intérêt de ce worker est qu’il est le pivot de toutes les opérations : il va être appelé coté KappellMeister pour gérer les créations d’instance de workflow et coté applicatif pour gérer l’appel aux agents et le protocole de réponse. En outre on exécute toute la logique elle-même en tâche de fond. Les appels sur les apis se limitent à mettre ce job en file d’attente. L’un des gros intérêt est que si un appel échoue ( par exemple un appel permettant de transférer un message ), on bénéficie de resque-cleaner pour les traquer et les rejouer ; même si une api est down, on pourra au pire rejouet l’appel !

Service privé

En plus de toutes nos classes abstraites, on va définir notre api privée définissant le protocole :

# async_workflow/rack_app.rb
require 'rack/app'
require_relative 'async_workflow.rb'
require 'resque'
#ASYNC_WF_API_PATH = 'async_workflow'

class WorflowAgentApp < Rack::App

  payload do
    parser do
      accept :json
      reject_unsupported_media_types
    end
  end

  post '/call_agent' do
    a_log "received #{payload}"
    wf_payload = WorkflowPayload.new payload
    Resque.enqueue WorkflowWorker wf_payload.to_h
  end

  post '/post_reply' do
    a_log "received #{payload}"
    Resque.enqueue_to :kappell, WorkflowWorker, wf_payload.to_h
  end

  post '/post_create' do
    a_log "received #{payload}"
    wf_payload = WorkflowPayload.new payload
    wf_payload.type = :create
    Resque.enqueue_to :kappell, WorkflowWorker, wf_payload.to_h
  end
end

… et on pourra grâce à la magie de Rack la déployer sur toutes nos applications.

Coté applicatif

On inclut dans l’application la dépendance à notre librairie. On monte le point d’api ‘privée’.

  mount WorflowAgentApp

Sur l’appli principale, on modifie notre point d’entrée : Traiter notre tâche compliquée revient à la déléguer au workflow !

  get 'async_activate' do
    message = WorkflowPayload.build_for_post_creation 'ActivityWorkflow', {name: params['user_name']}
    Resque.enqueue_to(:main, WorkflowWorker, message.to_h)
  end

Conductor side

Nous n’avons pas encore créé nontre nouvelle application qui va orchester tout ce travail.

Il lui faut : une api qui expose les services de notre protocole :

# web server for kapel meister
require_relative '../async_workflow/rack_app'
require_relative './kappellmeister.rb'
class KappelApi < Rack::App
  mount WorflowAgentApp
end
run KappelApi

Il lui faut un worker.

Et bien sûr ….la défintion de notre workflow :

require_relative '../async_workflow/async_workflow.rb'

class ActivityWorkflow < BaseWorkflow
  attr_accessor :name

  def initialize params
    @name = params['name']
    @id = name
  end

  workflow do
    state :created do
      event :start, transitions_to: :checking_agenda
    end

    state  :checking_agenda do
      event :notify_activity, transitions_to: :notifying_activity
      event :weather_report_needed, transitions_to: :checking_weather
    end

    state  :checking_weather do
      event :notify_activity, transitions_to: :notifying_activity
    end

    state :notifying_activity do
      event :notified, transitions_to: :finished
    end

  end

  register_agent 'CheckWeatherReport', url: 'localhost:3001'
  register_agent 'CheckAgenda', url: 'localhost:3002'
  register_agent 'UpdateUserActivity', url: 'localhost:3000'

  def on_checking_agenda_entry(_prior_state, _triggering_event, *_event_args)
    call_agent 'CheckAgenda', {date: Date.today.to_s }, :update_with_schedule
  end

  def on_checking_weather_entry(_prior_state, _triggering_event, *_event_args)
    call_agent 'CheckWhetherReport', {date: Date.today.to_s }, :update_with_weather
  end

  def on_notifying_activity_entry(_prior_state, _triggering_event, *_event_args)
    call_agent 'UpdateUserActivity', {user_name: @user_name }, :notified
  end

  def update_with_schedule! agent_reply
    if agent_reply[:task] != 'free'
      @activity = agent_reply[:task]
      notify_activity!
    else
      weather_report_needed!
    end
  end

  def update_with_weather agent_reply
    if agent_reply['rain'] || agent_reply['snow']
      @activity = code_for_fun
    else
      @activity = run
    end
    notify_activity!
  end
end

Il hérite de notre classe abstraite et definit sa propre logique d’enchainement des opérations. On s’appuis pour cela sur l’élégance de la gem workflow. Toute la tuyauterie est déléguée à la lib, et il ne nous reste que la partie fonctionnelle de l’ensemble des tâches.

Pour aller plus loin

En plus de rendre le code plus robuste que celui présenté ici, il faudra également ajouter un lock sur les clés de stockage des workflow et assurer que le worker a toujours le lock avant de modifier quoique ce soit afin de garantir l’intégrité du déroulement.

On aura également envie d’utiliser des mutations au lieu des classes de worker de base.



COMMENTAIRES