Director and workers now reconnect to pg automatically

Tracker story: https://www.pivotaltracker.com/story/show/88572280
parent 322c62a6
require "socket"
module Bosh::Dev::Sandbox
class ConnectionProxy
def initialize(remote_host, remote_port, listen_port)
@max_threads = 32
@accept_thread
@threads = []
@server_sockets = {}
@running = false
@remote_host = remote_host
@remote_port = remote_port
@listen_port = listen_port
end
def start_background
@accept_thread = Thread.new do
start
end
end
# This method is inspired by an example found at
# http://blog.bitmelt.com/2010/01/transparent-tcp-proxy-in-ruby-jruby.html
def start
if @running
raise "This ConnectionProxy is already running!"
end
@paused = false
@running = true
server = TCPServer.new(nil, @listen_port)
while @running
# Start a new thread for every client connection.
begin
sleep 0.1 while @paused
socket = server.accept
@threads << Thread.new(socket) do |client_socket|
proxy_single_connection(client_socket)
end
rescue Interrupt => i
server.close
ensure
# Clean up the dead threads, and wait until we have available threads.
@threads = @threads.select { |t| t.alive? ? true : (t.join; false) }
while @threads.size >= @max_threads
puts "Too many ConnectionProxy threads in use! Sleeping until some exit."
sleep 1
@threads = @threads.select { |t| t.alive? ? true : (t.join; false) }
end
end
end
end
def proxy_single_connection(client_socket)
begin
begin
server_socket = TCPSocket.new(@remote_host, @remote_port)
@server_sockets[Thread.current] = server_socket
rescue Errno::ECONNREFUSED
client_socket.close
raise
end
while true
sleep 0.1 while @paused
# Wait for data to be available on either socket.
(ready_sockets, dummy, dummy) = IO.select([client_socket, server_socket])
begin
ready_sockets.each do |socket|
data = socket.readpartial(4096)
if socket == client_socket
# Read from client, write to server.
server_socket.write data
server_socket.flush
else
# Read from server, write to client.
client_socket.write data
client_socket.flush
end
end
rescue EOFError
break
end
end
rescue StandardError => e
# this happens when we get EOF on the client or server socket
end
@server_sockets.delete(Thread.current)
server_socket.close rescue StandardError
client_socket.close rescue StandardError
end
def pause
@paused = true
end
def resume
@paused = false
end
def stop
if !@running
raise "This ConnectionProxy is not running!"
end
@paused = false
@running = false
if @accept_thread
@accept_thread.raise Interrupt
@accept_thread.join
end
@server_sockets.each do |thread, socket|
socket.close rescue StandardError
thread.join
end
end
end
end
......@@ -9,6 +9,7 @@ require 'bosh/dev/sandbox/nginx'
require 'bosh/dev/sandbox/workspace'
require 'bosh/dev/sandbox/director_config'
require 'bosh/dev/sandbox/port_provider'
require 'bosh/dev/sandbox/connection_proxy'
require 'bosh/dev/sandbox/services/director_service'
require 'bosh/dev/sandbox/services/nginx_service'
require 'cloud/dummy'
......@@ -41,6 +42,8 @@ module Bosh::Dev::Sandbox
attr_reader :director_service
attr_reader :port_provider
attr_reader :postgres_proxy
alias_method :db_name, :name
attr_reader :blobstore_storage_dir
......@@ -84,6 +87,10 @@ module Bosh::Dev::Sandbox
@nginx_service = NginxService.new(sandbox_root, director_port, director_ruby_port, uaa_port, @logger)
# all postgres connections go through this proxy (for testing automatic reconnect)
@postgres_proxy = ConnectionProxy.new("127.0.0.1", 5432, @port_provider.get_port(:postgres))
@postgres_proxy.start_background
director_config = sandbox_path(DirectorService::DIRECTOR_CONFIG)
director_tmp_path = sandbox_path('boshdir')
@director_service = DirectorService.new(director_ruby_port, redis_port, base_log_path, director_tmp_path, director_config, @logger)
......@@ -192,6 +199,8 @@ module Bosh::Dev::Sandbox
@uaa_process.stop
@database.drop_db
@postgres_proxy.stop
FileUtils.rm_f(dns_db_path)
FileUtils.rm_rf(agent_tmp_path)
FileUtils.rm_rf(blobstore_storage_dir)
......@@ -339,7 +348,7 @@ module Bosh::Dev::Sandbox
if db_opts[:type] == 'mysql'
@database = Mysql.new(@name, @logger, db_opts[:user], db_opts[:password])
else
@database = Postgresql.new(@name, @logger)
@database = Postgresql.new(@name, @logger, @port_provider.get_port(:postgres))
end
end
......
......@@ -5,14 +5,14 @@ module Bosh::Dev::Sandbox
class Postgresql
attr_reader :db_name, :username, :password, :adapter, :port
def initialize(db_name, logger, runner = Bosh::Core::Shell.new)
def initialize(db_name, logger, port, runner = Bosh::Core::Shell.new)
@db_name = db_name
@logger = logger
@runner = runner
@username = 'postgres'
@password = ''
@adapter = 'postgres'
@port = 5432
@port = port
end
# Assumption is that user running tests can
......
......@@ -45,7 +45,7 @@ namespace :spec do
if spec_path
"https_proxy= http_proxy= bundle exec rspec #{spec_path}"
else
"https_proxy= http_proxy= bundle exec parallel_test '#{test_path}'#{count}#{group} --group-by filesize --type rspec"
"https_proxy= http_proxy= bundle exec parallel_test '#{test_path}'#{count}#{group} --group-by filesize --type rspec -o '--format documentation'"
end
end
puts command
......
......@@ -3,7 +3,7 @@ require 'bosh/dev/sandbox/postgresql'
module Bosh::Dev::Sandbox
describe Postgresql do
subject(:postgresql) { described_class.new('fake_db_name', logger, runner) }
subject(:postgresql) { described_class.new('fake_db_name', logger, 9922, runner) }
let(:runner) { instance_double('Bosh::Core::Shell') }
describe '#create_db' do
......@@ -48,7 +48,7 @@ module Bosh::Dev::Sandbox
describe '#port' do
it 'has the correct port' do
expect(subject.port).to eq(5432)
expect(subject.port).to eq(9922)
end
end
end
......
......@@ -172,6 +172,8 @@ module Bosh::Director
db_config = db_config.merge(connection_options)
db = Sequel.connect(db_config)
db.extension :connection_validator
db.pool.connection_validation_timeout = -1
if logger
db.logger = logger
db.sql_log_level = :debug
......
require 'spec_helper'
describe 'recovering from postgres connection failures', type: :integration do
with_reset_sandbox_before_each
it 'can start a task after the postgres connections are cut and reconnected' do
target_and_login
current_sandbox.postgres_proxy.stop
sleep 1
current_sandbox.postgres_proxy.start_background
upload_stemcell
end
it 'can start a task before and after the postgres connections are cut and reconnected' do
target_and_login
upload_stemcell
current_sandbox.postgres_proxy.stop
sleep 1
current_sandbox.postgres_proxy.start_background
delete_stemcell
end
end
......@@ -66,6 +66,10 @@ module IntegrationExampleGroup
bosh_runner.run("upload stemcell #{spec_asset('valid_stemcell.tgz')}")
end
def delete_stemcell
bosh_runner.run("delete stemcell ubuntu-stemcell 1")
end
def set_deployment(options)
manifest_hash = options.fetch(:manifest_hash, Bosh::Spec::Deployments.simple_manifest)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment