Class: IO::LikeHelpers::BufferedIO

Inherits:
DelegatedIO show all
Defined in:
lib/io/like_helpers/buffered_io.rb

Overview

This class implements a stream that buffers data read from or written to a delegate.

Constant Summary collapse

DEFAULT_BUFFER_SIZE =

The default size of the internal buffer.

8192

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from DelegatedIO

#advise, #autoclose=, #autoclose?, #close_on_exec=, #close_on_exec?, create_finalizer, #fcntl, #fileno, #inspect, #ioctl, #nonblock=, #nonblock?, #path, #pid, #readable?, #ready?, #stat, #to_io, #tty?, #writable?

Methods inherited from AbstractIO

#advise, #close_on_exec=, #close_on_exec?, #closed?, #fcntl, #fileno, #ioctl, #nonblock, #nonblock=, #nonblock?, open, #path, #pid, #readable?, #ready?, #stat, #to_io, #tty?, #writable?

Constructor Details

#initialize(delegate, autoclose: true, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ BufferedIO

Creates a new intance of this class.

Parameters:

  • delegate (LikeHelpers::AbstractIO)

    a readable and/or writable stream

  • autoclose (Boolean) (defaults to: true)

    when ‘true` close the delegate when this stream is closed

  • buffer_size (Integer) (defaults to: DEFAULT_BUFFER_SIZE)

    the size of the internal buffer in bytes



23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/io/like_helpers/buffered_io.rb', line 23

def initialize(delegate, autoclose: true, buffer_size: DEFAULT_BUFFER_SIZE)
  buffer_size = Integer(buffer_size)
  if buffer_size <= 0
    raise ArgumentError, 'buffer_size must be greater than 0'
  end

  super(delegate, autoclose: autoclose)

  @buffer_size = buffer_size
  @buffer = "\0".b * @buffer_size
  @start_idx = @end_idx = @unread_offset = 0
  @mode = nil
end

Instance Attribute Details

#buffer_sizeObject (readonly)

The size of the internal buffer in bytes.



39
40
41
# File 'lib/io/like_helpers/buffered_io.rb', line 39

def buffer_size
  @buffer_size
end

Instance Method Details

#closenil, ...

Closes this stream, flushing data from the write buffer first if necessary.

The delegate is closed if autoclose is enabled for the stream.

Returns:

  • (nil)

    on success

  • (:wait_readable, :wait_writable)

    if the stream is non-blocking and the operation would block



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/io/like_helpers/buffered_io.rb', line 49

def close
  return nil if closed?

  begin
    result = flush if @mode == :write
  ensure
    # Complete the closing process if #flush completed normally or an
    # exception was raised.
    result = super unless Symbol === result
  end
  result
end

#fdatasync0, ...

Flushes any data in the write buffer and then forwards the call to the delegate.

Returns:

  • (0, nil)

    on success

  • (:wait_readable, :wait_writable)

    if the stream is non-blocking and the operation would block



69
70
71
72
73
74
75
# File 'lib/io/like_helpers/buffered_io.rb', line 69

def fdatasync
  assert_open

  result = flush
  return result if Symbol === result
  super
end

#flushnil, ...

Flushes any data in the write buffer to the delegate.

Returns:

  • (nil)

    on success

  • (:wait_readable, :wait_writable)

    if the stream is non-blocking and the operation would block

Raises:

  • (IOError)

    if the stream is closed



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/io/like_helpers/buffered_io.rb', line 85

def flush
  assert_open

  set_write_mode

  while @start_idx < @end_idx do
    remaining = @end_idx - @start_idx
    result = delegate.write(@buffer[@start_idx, remaining])
    return result if Symbol === result
    @start_idx += result
  end
  nil
end

#fsync0, ...

Flushes any data in the write buffer and then forwards the call to the delegate.

Returns:

  • (0, nil)

    on success

  • (:wait_readable, :wait_writable)

    if the stream is non-blocking and the operation would block

Raises:

  • (IOError)

    if the stream is closed



108
109
110
111
112
# File 'lib/io/like_helpers/buffered_io.rb', line 108

def fsync
  result = flush
  return result if Symbol === result
  super
end

#nreadInteger

Returns the number of bytes available to read from the internal buffer.

Returns:

  • (Integer)

    the number of bytes available to read from the internal buffer

Raises:

  • (IOError)

    if the stream is not readable



119
120
121
122
123
124
# File 'lib/io/like_helpers/buffered_io.rb', line 119

def nread
  assert_readable

  return 0 if read_buffer_empty?
  return @end_idx - @start_idx
end

#peek(length = nil) ⇒ String

Reads up to ‘length` bytes from the read buffer but does not advance the stream position.

Parameters:

  • length (Integer, nil) (defaults to: nil)

    the number of bytes to read or ‘nil` for all bytes

Returns:

  • (String)

    a buffer containing the bytes read

Raises:

  • (IOError)

    if the stream is not readable



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/io/like_helpers/buffered_io.rb', line 136

def peek(length = nil)
  if length.nil?
    length = nread
  else
    length = Integer(length)
    raise ArgumentError, 'length must be at least 0' if length < 0
  end

  assert_readable

  return ''.b unless @mode == :read

  available = @end_idx - @start_idx
  length = available if available < length
  return @buffer[@start_idx, length]
end

#pread(length, offset, buffer: nil, buffer_offset: 0) ⇒ Integer, ...

Note:

This method is not thread safe. Override it and add a mutex if thread safety is desired.

Reads at most ‘length` bytes from the stream starting at `offset` without modifying the read position in the stream.

Note that a partial read will occur if the stream is in non-blocking mode and reading more bytes would block.

Parameters:

  • length (Integer)

    the maximum number of bytes to read

  • offset (Integer)

    the offset from the beginning of the stream at which to begin reading

  • buffer (String) (defaults to: nil)

    if provided, a buffer into which the bytes should be placed

  • buffer_offset (Integer) (defaults to: 0)

    the index at which to insert bytes into ‘buffer`

Returns:

  • (Integer)

    the number of bytes read if ‘buffer` is not `nil`

  • (String)

    a new String containing the bytes read if ‘buffer` is `nil` or `buffer` if provided

  • (:wait_readable, :wait_writable)

    if the stream is non-blocking and the operation would block

Raises:

  • (EOFError)

    when reading at the end of the stream

  • (IOError)

    if the stream is not readable



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/io/like_helpers/buffered_io.rb', line 179

def pread(length, offset, buffer: nil, buffer_offset: 0)
  offset = Integer(offset)
  raise ArgumentError, 'offset must be at least 0' if offset < 0
  length = Integer(length)
  raise ArgumentError, 'length must be at least 0' if length < 0
  if ! buffer.nil?
    if buffer_offset < 0 || buffer_offset >= buffer.bytesize
      raise ArgumentError, 'buffer_offset is not a valid buffer index'
    end
    if buffer.bytesize - buffer_offset < length
      raise ArgumentError, 'length is greater than available buffer space'
    end
  end

  assert_readable

  result = set_read_mode
  return result if Symbol === result

  super
end

#pwrite(buffer, offset, length: buffer.bytesize) ⇒ Integer, ...

Note:

This method is not thread safe. Override it and add a mutex if thread safety is desired.

Writes at most ‘length` bytes to the stream starting at `offset` without modifying the write position in the stream.

Note that a partial write will occur if the stream is in non-blocking mode and writing more bytes would block.

Parameters:

  • buffer (String)

    the bytes to write (encoding assumed to be binary)

  • offset (Integer)

    the offset from the beginning of the stream at which to begin writing

  • length (Integer) (defaults to: buffer.bytesize)

    the number of bytes to write from ‘buffer`

Returns:

  • (Integer)

    the number of bytes written

  • (:wait_readable, :wait_writable)

    if the stream is non-blocking and the operation would block

Raises:

  • (IOError)

    if the stream is not writable



221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/io/like_helpers/buffered_io.rb', line 221

def pwrite(buffer, offset, length: buffer.bytesize)
  offset = Integer(offset)
  raise ArgumentError, 'offset must be at least 0' if offset < 0
  length = Integer(length)
  raise ArgumentError, 'length must be at least 0' if length < 0

  assert_writable

  set_write_mode

  super
end

#read(length, buffer: nil, buffer_offset: 0) ⇒ Integer, ...

Reads bytes from the stream.

Note that a partial read will occur if the stream is in non-blocking mode and reading more bytes would block.

Parameters:

  • length (Integer)

    the number of bytes to read

  • buffer (String) (defaults to: nil)

    the buffer into which bytes will be read (encoding assumed to be binary)

  • buffer_offset (Integer) (defaults to: 0)

    the index at which to insert bytes into ‘buffer`

Returns:

  • (Integer)

    the number of bytes read if ‘buffer` is not `nil`

  • (String)

    a buffer containing the bytes read if ‘buffer` is `nil`

  • (:wait_readable, :wait_writable)

    if the stream is non-blocking and the operation would block

Raises:

  • (EOFError)

    when reading at the end of the stream

  • (IOError)

    if the stream is not readable



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/io/like_helpers/buffered_io.rb', line 253

def read(length, buffer: nil, buffer_offset: 0)
  length = Integer(length)
  raise ArgumentError, 'length must be at least 0' if length < 0
  if ! buffer.nil?
    if buffer_offset < 0 || buffer_offset >= buffer.bytesize
      raise ArgumentError, 'buffer_offset is not a valid buffer index'
    end
    if buffer.bytesize - buffer_offset < length
      raise ArgumentError, 'length is greater than available buffer space'
    end
  end

  # Reload the internal buffer when empty.
  if read_buffer_empty?
    result = refill
    return result if Symbol === result
  end

  available = @end_idx - @start_idx
  length = available if available < length
  content = @buffer[@start_idx, length]
  @start_idx += length
  @unread_offset += [@unread_offset, length].min
  return content if buffer.nil?

  buffer[buffer_offset, length] = content
  return length
end

#read_buffer_empty?Boolean

Returns ‘true` if the read buffer is empty and `false` otherwise.

Returns:

  • (Boolean)


286
287
288
# File 'lib/io/like_helpers/buffered_io.rb', line 286

def read_buffer_empty?
  @mode != :read || @start_idx >= @end_idx
end

#refillInteger, ...

Refills the read buffer.

Returns:

  • (Integer)

    the number of bytes added to the read buffer

  • (:wait_readable, :wait_writable)

    if the stream is non-blocking and the operation would block

Raises:

  • (EOFError)

    when reading at the end of the stream

  • (IOError)

    if the stream is not readable



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/io/like_helpers/buffered_io.rb', line 299

def refill
  assert_readable

  result = set_read_mode
  return result if Symbol === result

  remaining = @end_idx - @start_idx
  available = buffer_size - remaining
  if available == 0
    # The read buffer is already full.
    return 0
  elsif available >= buffer_size
    # The read buffer is empty, so prepare to fill it at the beginning.
    @start_idx = @end_idx = @unread_offset = 0
  elsif @start_idx > 0
    # Shift the remaining buffer content to the beginning of the buffer.
    @buffer[0, remaining] = @buffer[@start_idx, remaining]
    @start_idx = 0
    @end_idx = remaining
  end

  result =
    delegate.read(available, buffer: @buffer, buffer_offset: @end_idx)

  # Return non-integer results from the delegate.
  return result if Symbol === result

  @end_idx += result

  result
end

#seek(amount, whence = IO::SEEK_SET) ⇒ Integer

Sets the current stream position to ‘amount` based on the setting of `whence`.

| ‘whence` | `amount` Interpretation | | ——– | ———————– | | `:CUR` or `IO::SEEK_CUR` | `amount` added to current stream position | | `:END` or `IO::SEEK_END` | `amount` added to end of stream position (`amount` will usually be negative here) | | `:SET` or `IO::SEEK_SET` | `amount` used as absolute position |

Parameters:

  • amount (Integer)

    the amount to move the position in bytes

  • whence (Integer, Symbol) (defaults to: IO::SEEK_SET)

    the position alias from which to consider ‘amount`

Returns:

  • (Integer)

    the new stream position

Raises:

  • (IOError)

    if the stream is closed

  • (Errno::ESPIPE)

    if the stream is not seekable



349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# File 'lib/io/like_helpers/buffered_io.rb', line 349

def seek(amount, whence = IO::SEEK_SET)
  case @mode
  when :write
    result = flush
    return result if Symbol === result
  when :read
    case whence
    when IO::SEEK_CUR, :CUR
      amount -= @end_idx - @start_idx - @unread_offset
    end
  end
  @mode = nil

  result = super(amount, whence)
  # Clear the buffer only if the seek was successful.
  @start_idx = @end_idx = @unread_offset = 0
  result
end

#skip(length = nil) ⇒ Integer

Advances forward in the read buffer up to ‘length` bytes.

Parameters:

  • length (Integer, nil) (defaults to: nil)

    the number of bytes to skip or ‘nil` for all bytes

Returns:

  • (Integer)

    the number of bytes actually skipped

Raises:

  • (IOError)

    if the stream is not readable



377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/io/like_helpers/buffered_io.rb', line 377

def skip(length = nil)
  if length.nil?
    length = nread
  else
    length = Integer(length)
    raise ArgumentError, 'length must be at least 0' if length < 0
  end

  assert_readable

  return 0 unless @mode == :read

  remaining = @end_idx - @start_idx
  length = remaining if length > remaining
  @start_idx += length
  @unread_offset += [@unread_offset, length].min

  length
end

#unread(buffer, length: buffer.bytesize) ⇒ nil

Places bytes at the beginning of the read buffer.

Parameters:

  • buffer (String)

    the bytes to insert into the read buffer

  • length (Integer) (defaults to: buffer.bytesize)

    the number of bytes from the beginning of ‘buffer` to insert into the read buffer

Returns:

  • (nil)

Raises:

  • (IOError)

    if the remaining space in the internal buffer is insufficient to contain the given data

  • (IOError)

    if the stream is not readable



409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'lib/io/like_helpers/buffered_io.rb', line 409

def unread(buffer, length: buffer.bytesize)
  length = Integer(length)
  raise ArgumentError, 'length must be at least 0' if length < 0

  assert_readable

  result = set_read_mode
  return result if Symbol === result

  used = @end_idx - @start_idx
  if length > @buffer_size - used
    raise IOError, 'insufficient buffer space for unread'
  end

  if length > @start_idx
    # Shift the available buffer content to the end of the buffer
    new_start_idx = @buffer_size - used
    @buffer[new_start_idx, used] = @buffer[@start_idx, used]
    @start_idx = new_start_idx
    @end_idx = @buffer_size
  end

  @start_idx -= length
  @unread_offset += length
  @buffer[@start_idx, length] = buffer[0, length]

  nil
end

#wait(events, timeout = nil) ⇒ true, false

Waits until the stream becomes ready for at least 1 of the specified events.

Parameters:

  • events (Integer)

    a bit mask of ‘IO::READABLE`, `IO::WRITABLE`, or `IO::PRIORITY`

  • timeout (Numeric, nil) (defaults to: nil)

    the timeout in seconds or no timeout if ‘nil`

Returns:

  • (true)

    if the stream becomes ready for at least one of the given events

  • (false)

    if the IO does not become ready before the timeout



448
449
450
451
452
453
454
455
456
# File 'lib/io/like_helpers/buffered_io.rb', line 448

def wait(events, timeout = nil)
  assert_open

  if events & (IO::READABLE | IO::PRIORITY) > 0 && ! read_buffer_empty?
    return true
  end

  super
end

#write(buffer, length: buffer.bytesize) ⇒ Integer, ...

Writes bytes to the stream.

Note that a partial write will occur if the stream is in non-blocking mode and writing more bytes would block.

Parameters:

  • buffer (String)

    the bytes to write (encoding assumed to be binary)

  • length (Integer) (defaults to: buffer.bytesize)

    the number of bytes to write from ‘buffer`

Returns:

  • (Integer)

    the number of bytes written

  • (:wait_readable, :wait_writable)

    if the stream is non-blocking and the operation would block

Raises:

  • (IOError)

    if the stream is not writable



472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
# File 'lib/io/like_helpers/buffered_io.rb', line 472

def write(buffer, length: buffer.bytesize)
  length = Integer(length)
  raise ArgumentError, 'length must be at least 0' if length < 0

  assert_writable

  set_write_mode

  available = @buffer_size - @end_idx
  if available <= 0
    result = flush
    return result if Symbol === result

    @start_idx = @end_idx = @unread_offset = 0
    available = @buffer_size
  end

  length = available if available < length
  @buffer[@end_idx, length] = buffer.b[0, length]
  @end_idx += length
  length
end

#write_buffer_empty?Boolean

Returns ‘true` if the write buffer it empty and `false` otherwise.

Returns:

  • (Boolean)


499
500
501
# File 'lib/io/like_helpers/buffered_io.rb', line 499

def write_buffer_empty?
  @mode != :write || @start_idx >= @end_idx
end