Class: Orocos::RemoteProcesses::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/orocos/remote_processes/client.rb

Overview

Client-side API to a Server instance

Process servers allow to start/stop and monitor processes on remote machines. Instances of this class provides access to remote process servers.

Defined Under Namespace

Classes: Failed, StartupFailed, TimeoutError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = 'localhost', port = DEFAULT_PORT, options = Hash.new) ⇒ Client

Connects to the process server at host:port

Options Hash (options):

  • :name_service (Orocos::NameService) — default: Orocos.name_service

    . The name service object that should be used to resolve tasks started by this process server

  • :root_loader (OroGen::Loaders::Base) — default: Orocos.default_loader

    . The loader object that should be used as root for this client's loader


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/orocos/remote_processes/client.rb', line 64

def initialize(host = 'localhost', port = DEFAULT_PORT, options = Hash.new)
    @host = host
    @port = port
    @socket =
        begin TCPSocket.new(host, port)
        rescue Errno::ECONNREFUSED => e
            raise e.class, "cannot contact process server at '#{host}:#{port}': #{e.message}"
        end

    socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
    socket.fcntl(Fcntl::FD_CLOEXEC, 1)

    options = Kernel.validate_options options,
        :name_service => Orocos.name_service,
        :root_loader => Orocos.default_loader
    @name_service = options[:name_service]


    begin
        @server_pid = pid
    rescue EOFError
        raise StartupFailed, "process server failed at '#{host}:#{port}'"
    end

    @loader = Loader.new(self, options[:root_loader])
    @root_loader = loader.root_loader
    @processes = Hash.new
    @death_queue = Array.new
    @host_id = "#{host}:#{port}:#{server_pid}"
end

Instance Attribute Details

#available_deploymentsObject (readonly)

Mapping from deployment names to the corresponding orogen project name. It lists the deployments that are available on the remote process server.


31
32
33
# File 'lib/orocos/remote_processes/client.rb', line 31

def available_deployments
  @available_deployments
end

#available_projectsObject (readonly)

Mapping from orogen project names to the corresponding content of the orogen files. These projects are the ones available to the remote process server


27
28
29
# File 'lib/orocos/remote_processes/client.rb', line 27

def available_projects
  @available_projects
end

#available_typekitsObject (readonly)

Mapping from deployment names to the corresponding XML type registry for the typekits available on the process server


34
35
36
# File 'lib/orocos/remote_processes/client.rb', line 34

def available_typekits
  @available_typekits
end

#hostObject (readonly)

The hostname we are connected to


40
41
42
# File 'lib/orocos/remote_processes/client.rb', line 40

def host
  @host
end

#host_idObject (readonly)

A string that allows to uniquely identify this process server


46
47
48
# File 'lib/orocos/remote_processes/client.rb', line 46

def host_id
  @host_id
end

#loaderLoader (readonly)

The loader object that allows to access models from the remote server


19
20
21
# File 'lib/orocos/remote_processes/client.rb', line 19

def loader
  @loader
end

#name_serviceObject (readonly)

The name service object that allows to resolve tasks from this process server


49
50
51
# File 'lib/orocos/remote_processes/client.rb', line 49

def name_service
  @name_service
end

#portObject (readonly)

The port on which we are connected on hostname


42
43
44
# File 'lib/orocos/remote_processes/client.rb', line 42

def port
  @port
end

#processesObject (readonly)

Mapping from a deployment name to the corresponding RemoteProcess instance, for processes that have been started by this client.


37
38
39
# File 'lib/orocos/remote_processes/client.rb', line 37

def processes
  @processes
end

#root_loaderOroGen::Loaders::Base (readonly)

The root loader object


22
23
24
# File 'lib/orocos/remote_processes/client.rb', line 22

def root_loader
  @root_loader
end

#server_pidObject (readonly)

The PID of the server process


44
45
46
# File 'lib/orocos/remote_processes/client.rb', line 44

def server_pid
  @server_pid
end

#socketObject (readonly)

The socket instance used to communicate with the server


16
17
18
# File 'lib/orocos/remote_processes/client.rb', line 16

def socket
  @socket
end

Instance Method Details

#create_log_dir(log_dir, time_tag, metadata = Hash.new) ⇒ Object

Creates a new log dir, and save the given time tag in it (used later on by save_log_dir)


201
202
203
204
# File 'lib/orocos/remote_processes/client.rb', line 201

def create_log_dir(log_dir, time_tag,  = Hash.new)
    socket.write(COMMAND_CREATE_LOG)
    Marshal.dump([log_dir, time_tag, ], socket)
end

#disconnectObject


115
116
117
# File 'lib/orocos/remote_processes/client.rb', line 115

def disconnect
    socket.close
end

#infoObject


107
108
109
110
111
112
113
# File 'lib/orocos/remote_processes/client.rb', line 107

def info
    socket.write(COMMAND_GET_INFO)
	    if !select([socket], [], [], 2)
raise "timeout while reading process server at '#{host}:#{port}'"
	    end
    Marshal.load(socket)
end

#inspectObject


54
# File 'lib/orocos/remote_processes/client.rb', line 54

def inspect; to_s end

#join(deployment_name) ⇒ Object


263
264
265
266
267
268
269
270
271
# File 'lib/orocos/remote_processes/client.rb', line 263

def join(deployment_name)
    process = processes[deployment_name]
    return if !process

    while true
        result = wait_termination(nil)
        return if result[process]
    end
end

#pidObject


95
96
97
98
99
100
101
102
103
104
105
# File 'lib/orocos/remote_processes/client.rb', line 95

def pid
    if @server_pid
        return @server_pid
    end

    socket.write(COMMAND_GET_PID)
	    if !select([socket], [], [], 2)
raise "timeout while reading process server at '#{host}:#{port}'"
	    end
    @server_pid = Integer(Marshal.load(socket).first)
end

#queue_death_announcementObject


206
207
208
# File 'lib/orocos/remote_processes/client.rb', line 206

def queue_death_announcement
    @death_queue.push Marshal.load(socket)
end

#quit_serverObject


273
274
275
# File 'lib/orocos/remote_processes/client.rb', line 273

def quit_server
    socket.write(COMMAND_QUIT)
end

#save_log_dir(log_dir, results_dir) ⇒ Object

Requests that the process server moves the log directory at log_dir to results_dir


194
195
196
197
# File 'lib/orocos/remote_processes/client.rb', line 194

def save_log_dir(log_dir, results_dir)
    socket.write(COMMAND_MOVE_LOG)
    Marshal.dump([log_dir, results_dir], socket)
end

#start(process_name, deployment, name_mappings = Hash.new, options = Hash.new) ⇒ Object

Starts the given deployment on the remote server, without waiting for it to be ready.

Returns a RemoteProcess instance that represents the process on the remote side.

Raises Failed if the server reports a startup failure


158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/orocos/remote_processes/client.rb', line 158

def start(process_name, deployment, name_mappings = Hash.new, options = Hash.new)
    if processes[process_name]
        raise ArgumentError, "this client already started a process called #{process_name}"
    end

    if deployment.respond_to?(:to_str)
        deployment_model = loader.root_loader.deployment_model_from_name(deployment)
        if !loader.has_deployment?(deployment)
            raise OroGen::DeploymentModelNotFound, "deployment #{deployment} exists locally but not on the remote process server #{self}"
        end
    else deployment_model = deployment
    end

    prefix_mappings = Orocos::ProcessBase.resolve_prefix(deployment_model, options.delete(:prefix))
    name_mappings = prefix_mappings.merge(name_mappings)

    socket.write(COMMAND_START)
    Marshal.dump([process_name, deployment_model.name, name_mappings, options], socket)
    wait_for_answer do |pid_s|
        if pid_s == RET_NO
            msg = Marshal.load(socket)
            raise Failed, "failed to start #{deployment_model.name}: #{msg}"
        elsif pid_s == RET_STARTED_PROCESS
            pid = Marshal.load(socket)
            process = Process.new(process_name, deployment_model, self, pid)
            process.name_mappings = name_mappings
            processes[process_name] = process
            return process
        else
            raise InternalError, "unexpected reply #{pid_s} to the start command"
        end
    end
end

#stop(deployment_name, wait) ⇒ Object

Requests to stop the given deployment

The call does not block until the process has quit. You will have to call #wait_termination to wait for the process end.


251
252
253
254
255
256
257
258
259
260
261
# File 'lib/orocos/remote_processes/client.rb', line 251

def stop(deployment_name, wait)
    socket.write(COMMAND_END)
    Marshal.dump(deployment_name, socket)
    if !wait_for_ack
        raise Failed, "failed to quit #{deployment_name}"
    end

    if wait
        join(deployment_name)
    end
end

#to_sObject


51
52
53
# File 'lib/orocos/remote_processes/client.rb', line 51

def to_s
    "#<Orocos::RemoteProcesses::Client #{host}:#{port}>"
end

#wait_for_ackObject


139
140
141
142
143
144
145
146
147
148
149
# File 'lib/orocos/remote_processes/client.rb', line 139

def wait_for_ack
    wait_for_answer do |reply|
        if reply == RET_YES
            return true
        elsif reply == RET_NO
            return false
        else
            raise InternalError, "unexpected reply #{reply}"
        end
    end
end

#wait_for_answer(timeout: 10) ⇒ Object


122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/orocos/remote_processes/client.rb', line 122

def wait_for_answer(timeout: 10)
    while true
        if !select([socket], [], [], timeout)
            raise TimeoutError, "reached timeout of #{timeout}s in #wait_for_answer"
        end

        reply = socket.read(1)
        if !reply
            raise Orocos::ComError, "failed to read from process server #{self}"
        elsif reply == EVENT_DEAD_PROCESS
            queue_death_announcement
        else
            yield(reply)
        end
    end
end

#wait_termination(timeout = nil) ⇒ Object

Waits for processes to terminate. timeout is the number of milliseconds we should wait. If set to nil, the call will block until a process terminates

Returns a hash that maps deployment names to the Process::Status object that represents their exit status.


216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/orocos/remote_processes/client.rb', line 216

def wait_termination(timeout = nil)
    if @death_queue.empty?
        reader = select([socket], nil, nil, timeout)
        return Hash.new if !reader
        while reader
            data = socket.read(1)
            if !data
                return Hash.new
            elsif data != EVENT_DEAD_PROCESS
                raise "unexpected message #{data} from process server"
            end
            queue_death_announcement
            reader = select([socket], nil, nil, 0)
        end
    end

    result = Hash.new
    @death_queue.each do |name, status|
        Orocos.debug "#{name} died"
        if p = processes.delete(name)
            p.dead!
            result[p] = status
        else
            Orocos.warn "process server reported the exit of '#{name}', but no process with that name is registered"
        end
    end
    @death_queue.clear

    result
end