Class: WebHDFS::ClientV1
- Inherits:
-
Object
- Object
- WebHDFS::ClientV1
- Defined in:
- lib/webhdfs/client_v1.rb
Direct Known Subclasses
Constant Summary
- OPT_TABLE =
This hash table holds command options.
{}
- KNOWN_ERRORS =
internal use only
['LeaseExpiredException'].freeze
- REDIRECTED_OPERATIONS =
['APPEND', 'CREATE', 'OPEN', 'GETFILECHECKSUM']
Instance Attribute Summary (collapse)
-
- (Object) doas
Returns the value of attribute doas.
-
- (Object) host
Returns the value of attribute host.
-
- (Object) httpfs_mode
Returns the value of attribute httpfs_mode.
-
- (Object) open_timeout
Returns the value of attribute open_timeout.
-
- (Object) port
Returns the value of attribute port.
-
- (Object) read_timeout
default 30s (in ruby net/http) default 60s (in ruby net/http).
-
- (Object) retry_interval
default false (not to retry) default 1 (ignored when retry_known_errors is false) default 1 ([sec], ignored when retry_known_errors is false).
-
- (Object) retry_known_errors
Returns the value of attribute retry_known_errors.
-
- (Object) retry_times
Returns the value of attribute retry_times.
-
- (Object) username
Returns the value of attribute username.
Instance Method Summary (collapse)
- - (Object) api_path(path)
-
- (Object) append(path, body, options = {})
curl -i -X POST “href=”http://:/webhdfs/v1/?op=APPEND[&buffersize=]“>:/webhdfs/v1/?op=APPEND”.
- - (Object) build_path(path, op, params)
-
- (Object) check_options(options, optdecl = [])
def delegation_token(user, options={}) # GETDELEGATIONTOKEN.
- - (Object) check_success_json(res, attr = nil)
-
- (Object) checksum(path, options = {})
(also: #getfilechecksum)
curl -i “:/webhdfs/v1/?op=GETFILECHECKSUM”.
-
- (Object) chmod(path, mode, options = {})
(also: #setpermission)
curl -i -X PUT “:/webhdfs/v1/?op=SETPERMISSION.
-
- (Object) chown(path, options = {})
(also: #setowner)
curl -i -X PUT “:/webhdfs/v1/?op=SETOWNER.
-
- (Object) content_summary(path, options = {})
(also: #getcontentsummary)
curl -i “:/webhdfs/v1/?op=GETCONTENTSUMMARY”.
-
- (Object) create(path, body, options = {})
curl -i -X PUT “:/webhdfs/v1/?op=CREATE.
-
- (Object) delete(path, options = {})
curl -i -X DELETE “:/webhdfs/v1/?op=DELETE.
-
- (Object) homedir(options = {})
(also: #gethomedirectory)
curl -i “:/webhdfs/v1/?op=GETHOMEDIRECTORY”.
-
- (ClientV1) initialize(host = 'localhost', port = 50070, username = nil, doas = nil)
constructor
A new instance of ClientV1.
-
- (Object) list(path, options = {})
(also: #liststatus)
curl -i “:/webhdfs/v1/?op=LISTSTATUS”.
-
- (Object) mkdir(path, options = {})
(also: #mkdirs)
curl -i -X PUT “href=”http://:/?op=MKDIRS[&permission=]“>:/?op=MKDIRS”.
- - (Object) operate_requests(method, path, op, params = {}, payload = nil)
-
- (Object) read(path, options = {})
(also: #open)
curl -i -L “:/webhdfs/v1/?op=OPEN.
-
- (Object) rename(path, dest, options = {})
curl -i -X PUT “:/webhdfs/v1/?op=RENAME&destination=”.
-
- (Object) replication(path, replnum, options = {})
(also: #setreplication)
curl -i -X PUT “:/webhdfs/v1/?op=SETREPLICATION.
-
- (Object) request(host, port, method, path, op = nil, params = {}, payload = nil, header = nil, retries = 0)
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error.
-
- (Object) stat(path, options = {})
(also: #getfilestatus)
curl -i “:/webhdfs/v1/?op=GETFILESTATUS”.
-
- (Object) touch(path, options = {})
(also: #settimes)
curl -i -X PUT “:/webhdfs/v1/?op=SETTIMES.
Constructor Details
- (ClientV1) initialize(host = 'localhost', port = 50070, username = nil, doas = nil)
A new instance of ClientV1
22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/webhdfs/client_v1.rb', line 22 def initialize(host='localhost', port=50070, username=nil, doas=nil) @host = host @port = port @username = username @doas = doas @retry_known_errors = false @retry_times = 1 @retry_interval = 1 @httpfs_mode = false end |
Instance Attribute Details
- (Object) doas
Returns the value of attribute doas
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def doas @doas end |
- (Object) host
Returns the value of attribute host
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def host @host end |
- (Object) httpfs_mode
Returns the value of attribute httpfs_mode
17 18 19 |
# File 'lib/webhdfs/client_v1.rb', line 17 def httpfs_mode @httpfs_mode end |
- (Object) open_timeout
Returns the value of attribute open_timeout
15 16 17 |
# File 'lib/webhdfs/client_v1.rb', line 15 def open_timeout @open_timeout end |
- (Object) port
Returns the value of attribute port
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def port @port end |
- (Object) read_timeout
default 30s (in ruby net/http) default 60s (in ruby net/http)
16 17 18 |
# File 'lib/webhdfs/client_v1.rb', line 16 def read_timeout @read_timeout end |
- (Object) retry_interval
default false (not to retry) default 1 (ignored when retry_known_errors is false) default 1 ([sec], ignored when retry_known_errors is false)
20 21 22 |
# File 'lib/webhdfs/client_v1.rb', line 20 def retry_interval @retry_interval end |
- (Object) retry_known_errors
Returns the value of attribute retry_known_errors
18 19 20 |
# File 'lib/webhdfs/client_v1.rb', line 18 def retry_known_errors @retry_known_errors end |
- (Object) retry_times
Returns the value of attribute retry_times
19 20 21 |
# File 'lib/webhdfs/client_v1.rb', line 19 def retry_times @retry_times end |
- (Object) username
Returns the value of attribute username
14 15 16 |
# File 'lib/webhdfs/client_v1.rb', line 14 def username @username end |
Instance Method Details
- (Object) api_path(path)
203 204 205 206 207 208 209 |
# File 'lib/webhdfs/client_v1.rb', line 203 def api_path(path) if path.start_with?('/') '/webhdfs/v1' + path else '/webhdfs/v1/' + path end end |
- (Object) append(path, body, options = {})
48 49 50 51 52 53 54 55 |
# File 'lib/webhdfs/client_v1.rb', line 48 def append(path, body, ={}) if @httpfs_mode = .merge({'data' => 'true'}) end (, OPT_TABLE['APPEND']) res = operate_requests('POST', path, 'APPEND', , body) res.code == '200' end |
- (Object) build_path(path, op, params)
211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/webhdfs/client_v1.rb', line 211 def build_path(path, op, params) opts = if @username and @doas {'op' => op, 'user.name' => @username, 'doas' => @doas} elsif @username {'op' => op, 'user.name' => @username} elsif @doas {'op' => op, 'doas' => @doas} else {'op' => op} end query = URI.encode_www_form(params.merge(opts)) api_path(path) + '?' + query end |
- (Object) check_options(options, optdecl = [])
def delegation_token(user, options={}) # GETDELEGATIONTOKEN
raise NotImplementedError
end def renew_delegation_token(token, options={}) # RENEWDELEGATIONTOKEN
raise NotImplementedError
end def cancel_delegation_token(token, options={}) # CANCELDELEGATIONTOKEN
raise NotImplementedError
end
194 195 196 197 |
# File 'lib/webhdfs/client_v1.rb', line 194 def (, optdecl=[]) ex = .keys.map(&:to_s) - (optdecl || []) raise ArgumentError, "no such option: #{ex.join(' ')}" unless ex.empty? end |
- (Object) check_success_json(res, attr = nil)
199 200 201 |
# File 'lib/webhdfs/client_v1.rb', line 199 def check_success_json(res, attr=nil) res.code == '200' and res.content_type == 'application/json' and (attr.nil? or JSON.parse(res.body)[attr]) end |
- (Object) checksum(path, options = {}) Also known as: getfilechecksum
121 122 123 124 125 |
# File 'lib/webhdfs/client_v1.rb', line 121 def checksum(path, ={}) (, OPT_TABLE['GETFILECHECKSUM']) res = operate_requests('GET', path, 'GETFILECHECKSUM', ) check_success_json(res, 'FileChecksum') end |
- (Object) chmod(path, mode, options = {}) Also known as: setpermission
curl -i -X PUT “[&permission=<OCTAL>]"
138 139 140 141 142 |
# File 'lib/webhdfs/client_v1.rb', line 138 def chmod(path, mode, ={}) (, OPT_TABLE['SETPERMISSION']) res = operate_requests('PUT', path, 'SETPERMISSION', .merge({'permission' => mode})) res.code == '200' end |
- (Object) chown(path, options = {}) Also known as: setowner
curl -i -X PUT “[&owner=<USER>][&group=<GROUP>]"
147 148 149 150 151 152 153 154 155 |
# File 'lib/webhdfs/client_v1.rb', line 147 def chown(path, ={}) (, OPT_TABLE['SETOWNER']) unless .has_key?('owner') or .has_key?('group') or .has_key?(:owner) or .has_key?(:group) raise ArgumentError, "'chown' needs at least one of owner or group" end res = operate_requests('PUT', path, 'SETOWNER', ) res.code == '200' end |
- (Object) content_summary(path, options = {}) Also known as: getcontentsummary
113 114 115 116 117 |
# File 'lib/webhdfs/client_v1.rb', line 113 def content_summary(path, ={}) (, OPT_TABLE['GETCONTENTSUMMARY']) res = operate_requests('GET', path, 'GETCONTENTSUMMARY', ) check_success_json(res, 'ContentSummary') end |
- (Object) create(path, body, options = {})
37 38 39 40 41 42 43 44 |
# File 'lib/webhdfs/client_v1.rb', line 37 def create(path, body, ={}) if @httpfs_mode = .merge({'data' => 'true'}) end (, OPT_TABLE['CREATE']) res = operate_requests('PUT', path, 'CREATE', , body) res.code == '201' end |
- (Object) delete(path, options = {})
curl -i -X DELETE “[&recursive=<true|false>]"
89 90 91 92 93 |
# File 'lib/webhdfs/client_v1.rb', line 89 def delete(path, ={}) (, OPT_TABLE['DELETE']) res = operate_requests('DELETE', path, 'DELETE', ) check_success_json(res, 'boolean') end |
- (Object) homedir(options = {}) Also known as: gethomedirectory
129 130 131 132 133 |
# File 'lib/webhdfs/client_v1.rb', line 129 def homedir(={}) (, OPT_TABLE['GETHOMEDIRECTORY']) res = operate_requests('GET', '/', 'GETHOMEDIRECTORY', ) check_success_json(res, 'Path') end |
- (Object) list(path, options = {}) Also known as: liststatus
105 106 107 108 109 |
# File 'lib/webhdfs/client_v1.rb', line 105 def list(path, ={}) (, OPT_TABLE['LISTSTATUS']) res = operate_requests('GET', path, 'LISTSTATUS', ) check_success_json(res, 'FileStatuses')['FileStatus'] end |
- (Object) mkdir(path, options = {}) Also known as: mkdirs
69 70 71 72 73 |
# File 'lib/webhdfs/client_v1.rb', line 69 def mkdir(path, ={}) (, OPT_TABLE['MKDIRS']) res = operate_requests('PUT', path, 'MKDIRS', ) check_success_json(res, 'boolean') end |
- (Object) operate_requests(method, path, op, params = {}, payload = nil)
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/webhdfs/client_v1.rb', line 226 def operate_requests(method, path, op, params={}, payload=nil) if not @httpfs_mode and REDIRECTED_OPERATIONS.include?(op) res = request(@host, @port, method, path, op, params, nil) unless res.is_a?(Net::HTTPRedirection) and res['location'] msg = "NameNode returns non-redirection (or without location header), code:#{res.code}, body:#{res.body}." raise WebHDFS::RequestFailedError, msg end uri = URI.parse(res['location']) rpath = if uri.query uri.path + '?' + uri.query else uri.path end request(uri.host, uri.port, method, rpath, nil, {}, payload, {'Content-Type' => 'application/octet-stream'}) else if @httpfs_mode and not payload.nil? request(@host, @port, method, path, op, params, payload, {'Content-Type' => 'application/octet-stream'}) else request(@host, @port, method, path, op, params, payload) end end end |
- (Object) read(path, options = {}) Also known as: open
60 61 62 63 64 |
# File 'lib/webhdfs/client_v1.rb', line 60 def read(path, ={}) (, OPT_TABLE['OPEN']) res = operate_requests('GET', path, 'OPEN', ) res.body end |
- (Object) rename(path, dest, options = {})
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>”
78 79 80 81 82 83 84 85 |
# File 'lib/webhdfs/client_v1.rb', line 78 def rename(path, dest, ={}) (, OPT_TABLE['RENAME']) unless dest.start_with?('/') dest = '/' + dest end res = operate_requests('PUT', path, 'RENAME', .merge({'destination' => dest})) check_success_json(res, 'boolean') end |
- (Object) replication(path, replnum, options = {}) Also known as: setreplication
curl -i -X PUT “[&replication=<SHORT>]"
161 162 163 164 165 |
# File 'lib/webhdfs/client_v1.rb', line 161 def replication(path, replnum, ={}) (, OPT_TABLE['SETREPLICATION']) res = operate_requests('PUT', path, 'SETREPLICATION', .merge({'replication' => replnum.to_s})) check_success_json(res, 'boolean') end |
- (Object) request(host, port, method, path, op = nil, params = {}, payload = nil, header = nil, retries = 0)
IllegalArgumentException 400 Bad Request UnsupportedOperationException 400 Bad Request SecurityException 401 Unauthorized IOException 403 Forbidden FileNotFoundException 404 Not Found RumtimeException 500 Internal Server Error
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/webhdfs/client_v1.rb', line 255 def request(host, port, method, path, op=nil, params={}, payload=nil, header=nil, retries=0) conn = Net::HTTP.new(host, port) conn.open_timeout = @open_timeout if @open_timeout conn.read_timeout = @read_timeout if @read_timeout request_path = if op build_path(path, op, params) else path end res = conn.send_request(method, request_path, payload, header) case res when Net::HTTPSuccess res when Net::HTTPRedirection res else = if res.body and not res.body.empty? res.body.gsub(/\n/, '') else 'Response body is empty...' end if @retry_known_errors && retries < @retry_times detail = nil if =~ /^\{"RemoteException":\{/ begin detail = JSON.parse() rescue # ignore broken json response body end end if detail && detail['RemoteException'] && KNOWN_ERRORS.include?(detail['RemoteException']['exception']) sleep @retry_interval if @retry_interval > 0 return request(host, port, method, path, op, params, payload, header, retries+1) end end case res.code when '400' raise WebHDFS::ClientError, when '401' raise WebHDFS::SecurityError, when '403' raise WebHDFS::IOError, when '404' raise WebHDFS::FileNotFoundError, when '500' raise WebHDFS::ServerError, else raise WebHDFS::RequestFailedError, "response code:#{res.code}, message:#{}" end end end |
- (Object) stat(path, options = {}) Also known as: getfilestatus
97 98 99 100 101 |
# File 'lib/webhdfs/client_v1.rb', line 97 def stat(path, ={}) (, OPT_TABLE['GETFILESTATUS']) res = operate_requests('GET', path, 'GETFILESTATUS', ) check_success_json(res, 'FileStatus') end |
- (Object) touch(path, options = {}) Also known as: settimes
172 173 174 175 176 177 178 179 180 |
# File 'lib/webhdfs/client_v1.rb', line 172 def touch(path, ={}) (, OPT_TABLE['SETTIMES']) unless .has_key?('modificationtime') or .has_key?('accesstime') or .has_key?(:modificationtime) or .has_key?(:accesstime) raise ArgumentError, "'chown' needs at least one of modificationtime or accesstime" end res = operate_requests('PUT', path, 'SETTIMES', ) res.code == '200' end |