Project

rbgo

0.0
Repository is archived
No commit activity in last 3 years
No release in over 3 years
Write concurrent program with Ruby in Golang style.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
 Dependencies

Runtime

~> 2.4
~> 0.1.3
 Project Readme

This project is trying to help writing concurrent program with ruby a little easier.

select_chan

select channels like golang in ruby

Chan is buffered or non-buffered, just like make(chan Type,n) or make(chan Type) in golang but not statically typed in my implementation.

example:

require 'rbgo'

include Rbgo::Channel

ch1 = Chan.new  # non-buffer channel
ch2 = Chan.new(2) # buffer channel 
ch3 = Chan.new(1)

ch3 << 'hello'

select_chan(
  on_read(chan: ch1){|obj, ok| # obj is the value read from ch1, ok indicates success or failure by close
    #do when read success
  },
  on_read(chan: ch2){
    #do when read success
  },
  on_write(chan: ch3, obj: 'world'){
    #do when write success
  }
){ puts 'call default block' }  


timeout_ch = Chan.after(4)
job_ch1 = Chan.perform do
  #do some job
  sleep 
end

job_ch2 = Chan.perform(timeout: 3) do
  #do some job
  sleep
end


select_chan(
  on_read(chan: job_ch1){
    p :job_ch1
  },
  on_read(chan: job_ch2){
    p :job_ch2
  },
  on_read(chan: timeout_ch){
    p 'timeout'
  }
)        
        
# Chan is Enumerable
 
ch = Chan.new

go do
  10.times do
    ch << Time.now
  end
end

ch.each do|obj|
  p obj
end

go

create lightweight routine like golang

Routine does not create a thread instead it is picked by a thread in pool.

Routine use fiber to yield processor, so you can call Fiber.yield, just like Gosched() in golang.

If routine raise exception it does not affect other routine. Check Routine#alive? to see if it is alive, and check Routine#error to see if something went wrong.

If routine is suspended by a blocking operation for enough long time, a new thread will be created to handle other routine. Suspended thread will exit when it completes current routine.

example:

require 'rbgo'

using Rbgo::CoRunExtensions

wg = Rbgo::WaitGroup.new

wg.add(1)
go do
  puts 'start' 

  Fiber.yield  # like Gosched()

  puts 'end'

  wg.done
end

wg.add(1)
go do
  sleep 1
  puts 'sleep end'
  wg.done
end

wg.wait
puts 'wg.wait done'
                             

# the job that takes very long time or never returns such as 'loop'.
# use Fiber.yield to let other yield jobs on the same thread to run.

go do           
  loop do
    # do some job
    # ...
    Fiber.yield 
  end
end      

# yield_io create a new thread to do operations, current routine will yield to do other task,
# it will come back to go on execute when yield_io block finish.  

require 'open-uri'
go do
  res = yield_io do
    f = open("http://www.google.com")
    f.read
  end
  p res
end
               

# Or use go!        

go! do         

# go! force a new thread be created to handle this block, and exit when finish.
# use go! to do blocking operations

end

# Once do block only once

once = Rbgo::Once.new
2.times do
  go do
    once.do { puts 'only once' }
  end
end



# Actor handle message sequentially
  
actor = Rbgo::Actor.new do|msg, actor|     
          case msg
          when :msg1
            # do some work 
          when :msg2
            # do some work
          when :msg3
            actor.send_msg :msg1 
          end
        end

actor.send_msg :msg1 #won't block     

# Actors can be parent-child relationship, and can be linked or monitored
 
supervisor = Rbgo::Actor.new do |msg, actor|
  case msg
  when Rbgo::ActorClosedMsg
    p msg.actor.close_reason # why actor closed
    actor.spawn_monitor(&msg.actor.handler) # spawn again
  end
end

10000.times do
  supervisor.spawn_monitor do |msg, _|  # when these actors closed, supervisor will receive ActorClosedMsg
                                        # and decide what to do
    p msg
  end
end

link1 = Rbgo::Actor.new do |msg, actor|
end
link2 = link1.spawn_link do
end

link1.close # any one of linked actors close will cause all other close

# link / unlink
# monitor / demonitor   
# when use spawn_xx , actors have parent-child relationship 


# TaskList do task in order but do it asynchronously, task may be executed in different thread
 
task_list = Rbgo::TaskList.new

task_1 = proc do
  puts 'first'
  1
end

task_2 = proc do |last_result|
  puts 'second'
  puts "last task result #{last_result}"
  sleep 5
  2
end

task_3 = proc do |last_result|
  puts 'third'
  puts "last task result #{last_result}"
  3
end

task_list << task_1
task_list.add(task_2, timeout: 2, skip_on_exception: true)
task_list << task_3

task_list.start
task_list.wait
p 'wait done.'                          
p task_list.complete?
p task_list.last_error
                       
# Reentrant/ReadWirte Mutex and Semaphore

m = Rbgo::ReentrantMutex.new
m.synchronize do
  m.synchronize do
    puts "I'm here!"
  end
end

m2 = Rbgo::RWMutex.new
5.times do
  go do
    m2.synchronize_r do
      puts "got the read lock"
    end
  end
end
go do
  m2.synchronize_w do
    puts "got the write lock"
    m2.synchronize_r do
      puts 'now, downgrade to read lock'
    end
  end
end

sleep 2

go do
  s = Rbgo::Semaphore.new(5)

  s.acquire(4) do
    puts 'got the permits'
  end

  s.acquire_all do
    puts 'got all available permits'
  end

  ok = s.try_acquire do
    puts 'try success'
  end
  puts 'try ok!' if ok
end

IOMachine

IOMachine wrap nio4r to do IO operation asynchronously.

support platforms: MRI, JRuby.

require 'rbgo'

io_r, io_w = IO.pipe
machine = Rbgo::IOMachine.new

receipt1 = machine.do_read(io_r, length: 100) # when length > 0, result nil if have not read anything yet when read complete.
receipt2 = machine.do_read(io_r, length: 100) # when length >0, result data bytes length up to 100 if have read some when read complete.
receipt3 = machine.do_read(io_r, length: 0) # when length == 0, result ""
receipt4 = machine.do_read(io_r, length: nil) # when length == nil, read until EOF. return "" if have not read anything.

io_w.write("a"*100)
io_w.write("b"*100)
io_w.write("c"*100)
io_w.close # cause EOF

# if the same io object, and the same read/write operation, operations will complete in sequence.
# so receipt1 complete first, and the receipt2 ...
# if the same io object, but not the same read/write operation,
# or not the same io object, operations will complete in arbitrary order.
receipt1.wait
p receipt1.res # aaa...
receipt2.wait
p receipt2.res # bbb...
receipt3.wait
p receipt3.res # ""
receipt4.wait
p receipt4.res # ccc...

io_r, io_w = IO.pipe
receipt1 = machine.do_write(io_w, str: "hello world!")

receipt1.wait
p receipt1.res # number of bytes written, may be less than str.bytesize if exception raised

yield_read / yield_write

use IOMachine to do IO operation asynchronously, but write code in a sequential way.

No callback and another callback...

require 'rbgo'
using Rbgo::CoRunExtensions

io_r, io_w = IO.pipe
go do
  data = io_r.yield_read(100) # have blocking semantics, but execute asynchronously
                              # this operation will *NOT* block the current thread
                              # when io operation complete, thread will resume to execute from this point.
  p data
end                   
              
require 'open-uri'
go do
  res = yield_io do
    f = open("http://www.google.com")
    f.read
  end
  p res
end

sleep 2

io_w.write("haha I'm crazy.")
io_w.close

# NOTICE yield_read / yield_write will do yield only in CoRun::Routine.new(*args, new_thread: false, &blk) block
# in other place yield_read / yield_write will do normal IO#read / IO#write
# for example: 
# go do
#   io_r.yield_read # will do yield
# end
# 
# go do
#   fiber = Fiber.new do
#     io_r.yield_read # will not do yield. just do normal IO#read
#   end
#   fiber.resume
# end
# 

NetworkService

open TCP or UDP service

Because service handles network request in async mode, it can handle many requests concurrently. If use some Non-GIL ruby implementations such as TruffleRuby or JRuby, it can utilize all your CPU cores.

Use IO#yield_read IO#yield_write to do IO operations

require 'rbgo'

using Rbgo::CoRunExtensions

#localhost, port 3000
tcp_service = Rbgo::NetworkServiceFactory.open_tcp_service(3000) do|sock, _|
  sock.yield_read_line("\r\n\r\n") # read http request
  sock.close_read
  sock.yield_write("HTTP/1.1 200 OK \r\n\r\nHello World!")
  sock.close_write
  sock.close
end                 

                                        

p "start tcp service: #{[tcp_service.host, tcp_service.port, tcp_service.type]}"           
sleep 5
tcp_service.stop
                            
          

#localhost, port auto pick
udp_service = Rbgo::NetworkServiceFactory.open_udp_service(0) do|msg, reply_msg|
  p msg
  reply_msg.reply("I receive your message")
end

p "start udp service: #{[udp_service.host, udp_service.port, udp_service.type]}"      
sleep 5
udp_service.stop


sleep