Commit 66a72689 authored by jf+nz's avatar jf+nz Committed by Jonathan Fuerth and Nader Ziada

Second try: Director and workers now reconnect to pg automatically

Tracker story: https://www.pivotaltracker.com/story/show/88572280

Fixed connection proxy so it works on Linux and OS X

We now run the connection proxy in a separate process because sometimes
Ruby 2.1.5 on Linux will segfault while the socket & thread shutdown is
happening.
parent e028c620
#!/usr/bin/env ruby
require "socket"
class ConnectionProxy
def initialize(remote_host, remote_port, listen_port)
@max_threads = 32
@threads = []
@server_sockets = {}
@remote_host = remote_host
@remote_port = remote_port
@listen_port = listen_port
end
# This method is inspired by an example found at
# http://blog.bitmelt.com/2010/01/transparent-tcp-proxy-in-ruby-jruby.html
def start
puts "Starting to proxy connections from localhost:#{@listen_port} -> #{@remote_host}:#{@remote_port}"
server = TCPServer.new(nil, @listen_port)
while true
# Start a new thread for every client connection.
begin
socket = server.accept
@threads << Thread.new(socket) do |client_socket|
proxy_single_connection(client_socket)
end
rescue Interrupt => i
server.close
ensure
# Clean up the dead threads, and wait until we have available threads.
@threads = @threads.select { |t| t.alive? ? true : (t.join; false) }
while @threads.size >= @max_threads
puts "Too many ConnectionProxy threads in use! Sleeping until some exit."
sleep 1
@threads = @threads.select { |t| t.alive? ? true : (t.join; false) }
end
end
end
end
def proxy_single_connection(client_socket)
begin
begin
server_socket = TCPSocket.new(@remote_host, @remote_port)
@server_sockets[Thread.current] = server_socket
rescue Errno::ECONNREFUSED
client_socket.close
raise
end
while true
# Wait for data to be available on either socket.
(ready_sockets, dummy, dummy) = IO.select([client_socket, server_socket])
begin
ready_sockets.each do |socket|
data = socket.readpartial(4096)
if socket == client_socket
# Read from client, write to server.
server_socket.write data
server_socket.flush
else
# Read from server, write to client.
client_socket.write data
client_socket.flush
end
end
rescue EOFError
break
end
end
rescue StandardError => e
# this happens when we get EOF on the client or server socket
end
@server_sockets.delete(Thread.current)
server_socket.close rescue StandardError
client_socket.close rescue StandardError
end
end
cp = ConnectionProxy.new(ARGV[0], ARGV[1].to_i, ARGV[2].to_i)
cp.start
......@@ -11,6 +11,7 @@ require 'bosh/dev/sandbox/director_config'
require 'bosh/dev/sandbox/port_provider'
require 'bosh/dev/sandbox/services/director_service'
require 'bosh/dev/sandbox/services/nginx_service'
require 'bosh/dev/sandbox/services/connection_proxy_service'
require 'cloud/dummy'
require 'logging'
......@@ -41,6 +42,8 @@ module Bosh::Dev::Sandbox
attr_reader :director_service
attr_reader :port_provider
attr_reader :postgres_proxy
alias_method :db_name, :name
attr_reader :blobstore_storage_dir
......@@ -84,6 +87,10 @@ module Bosh::Dev::Sandbox
@nginx_service = NginxService.new(sandbox_root, director_port, director_ruby_port, uaa_port, @logger)
# all postgres connections go through this proxy (for testing automatic reconnect)
@postgres_proxy = ConnectionProxyService.new("127.0.0.1", 5432, @port_provider.get_port(:postgres), @logger)
@postgres_proxy.start
director_config = sandbox_path(DirectorService::DIRECTOR_CONFIG)
director_tmp_path = sandbox_path('boshdir')
@director_service = DirectorService.new(director_ruby_port, redis_port, base_log_path, director_tmp_path, director_config, @logger)
......@@ -192,6 +199,8 @@ module Bosh::Dev::Sandbox
@uaa_process.stop
@database.drop_db
@postgres_proxy.stop
FileUtils.rm_f(dns_db_path)
FileUtils.rm_rf(agent_tmp_path)
FileUtils.rm_rf(blobstore_storage_dir)
......@@ -339,7 +348,7 @@ module Bosh::Dev::Sandbox
if db_opts[:type] == 'mysql'
@database = Mysql.new(@name, @logger, db_opts[:user], db_opts[:password])
else
@database = Postgresql.new(@name, @logger)
@database = Postgresql.new(@name, @logger, @port_provider.get_port(:postgres))
end
end
......
......@@ -5,14 +5,14 @@ module Bosh::Dev::Sandbox
class Postgresql
attr_reader :db_name, :username, :password, :adapter, :port
def initialize(db_name, logger, runner = Bosh::Core::Shell.new)
def initialize(db_name, logger, port, runner = Bosh::Core::Shell.new)
@db_name = db_name
@logger = logger
@runner = runner
@username = 'postgres'
@password = ''
@adapter = 'postgres'
@port = 5432
@port = port
end
# Assumption is that user running tests can
......
module Bosh::Dev::Sandbox
class ConnectionProxyService
REPO_ROOT = File.expand_path('../../../../../../', File.dirname(__FILE__))
ASSETS_DIR = File.expand_path('bosh-dev/assets/sandbox', REPO_ROOT)
TCP_PROXY = File.join(ASSETS_DIR, 'proxy/tcp-proxy')
def initialize(forward_to_host, forward_to_port, listen_port, logger)
@logger = logger
@process = Service.new(%W[#{TCP_PROXY} #{forward_to_host} #{forward_to_port} #{listen_port}], {}, logger)
@socket_connector = SocketConnector.new("proxy #{listen_port} -> #{forward_to_host}:#{forward_to_port}", 'localhost', listen_port, logger)
end
def start
@process.start
@socket_connector.try_to_connect
end
def stop
@process.stop
end
end
end
......@@ -45,7 +45,7 @@ namespace :spec do
if spec_path
"https_proxy= http_proxy= bundle exec rspec #{spec_path}"
else
"https_proxy= http_proxy= bundle exec parallel_test '#{test_path}'#{count}#{group} --group-by filesize --type rspec"
"https_proxy= http_proxy= bundle exec parallel_test '#{test_path}'#{count}#{group} --group-by filesize --type rspec -o '--format documentation'"
end
end
puts command
......
......@@ -3,7 +3,7 @@ require 'bosh/dev/sandbox/postgresql'
module Bosh::Dev::Sandbox
describe Postgresql do
subject(:postgresql) { described_class.new('fake_db_name', logger, runner) }
subject(:postgresql) { described_class.new('fake_db_name', logger, 9922, runner) }
let(:runner) { instance_double('Bosh::Core::Shell') }
describe '#create_db' do
......@@ -48,7 +48,7 @@ module Bosh::Dev::Sandbox
describe '#port' do
it 'has the correct port' do
expect(subject.port).to eq(5432)
expect(subject.port).to eq(9922)
end
end
end
......
......@@ -172,6 +172,8 @@ module Bosh::Director
db_config = db_config.merge(connection_options)
db = Sequel.connect(db_config)
db.extension :connection_validator
db.pool.connection_validation_timeout = -1
if logger
db.logger = logger
db.sql_log_level = :debug
......
require 'spec_helper'
describe 'recovering from postgres connection failures', type: :integration do
with_reset_sandbox_before_each
it 'can start a task after the postgres connections are cut and reconnected' do
target_and_login
current_sandbox.postgres_proxy.stop
current_sandbox.postgres_proxy.start
upload_stemcell
end
it 'can start a task before and after the postgres connections are cut and reconnected' do
target_and_login
upload_stemcell
current_sandbox.postgres_proxy.stop
current_sandbox.postgres_proxy.start
delete_stemcell
end
end
......@@ -66,6 +66,10 @@ module IntegrationExampleGroup
bosh_runner.run("upload stemcell #{spec_asset('valid_stemcell.tgz')}")
end
def delete_stemcell
bosh_runner.run("delete stemcell ubuntu-stemcell 1")
end
def set_deployment(options)
manifest_hash = options.fetch(:manifest_hash, Bosh::Spec::Deployments.legacy_simple_manifest)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment