Mutexのロックが解除されない
現在P2Pのシステムを構築中です。
何らかのキー入力を検知して、サブスレッドを終了させています。
ソケットの接続を待ち受けているサブスレッドが終了された後にlocal_server.lock().unwrap().shutdown();
を実行したいのですが、Mutexのロックが解除されないのでlockができず、デッドロックの状態になってしまいます。
Mutexのロックが解除されないので、local_server.lock().unwrap().shutdown();
が実行できないという自体に陥っています。
local_server.lock().unwrap().shutdown();
を実行するために何ができるでしょうか?
追記:
start()
をreturnしたいのですが、shutdown()
をトリガーにしてreturnしたいと思っています。
しかし、そのトリガーであるshutdown()
が呼べないので困っています。
server1.rs
use super::server_core;
use std::net::{SocketAddr};
use std::io::{self, BufRead};
use std::{thread, time::Duration};
use std::sync::mpsc::{self, TryRecvError};
use std::sync::Arc;
use std::io::Error;
pub fn main(my_addr: SocketAddr, parent_addr: SocketAddr) -> Result<(), Error> {
let mut server: server_core::ServerCore = server_core::ServerCore::new(my_addr, parent_addr);
let local_server = server.inner.clone();
let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || loop {
let local_server = server.inner.clone();
local_server.lock().unwrap().start();
match rx.recv() {
Ok(_) => {
println!("Terminating.");
break;
}
Err(_) => {
}
}
});
let mut line = String::new();
let stdin = io::stdin();
let _ = stdin.lock().read_line(&mut line);
let _ = tx.send(());
thread::sleep(Duration::from_secs(5));
// can't call
local_server.lock().unwrap().shutdown();
// handle.join().unwrap();
Ok(())
}
server_core.rs
use super::connection_manager;
use std::net::{SocketAddr, IpAddr, Ipv4Addr};
use std::sync::{Arc, Mutex};
// STATE_INIT = 1
// STATE_STANDBY = 2
// STATE_CONNECTED_TO_NETWORK = 3
// STATE_SHUTTING_DOWN = 4
#[derive(Debug)]
pub struct ChildServerCore {
pub server_state: u8,
pub addr: SocketAddr,
pub parent_addr: SocketAddr,
pub cm: connection_manager::ConnectionManager
}
#[derive(Debug)]
pub struct ServerCore {
pub inner: Arc<Mutex<ChildServerCore>>
}
impl ServerCore {
pub fn new(addr: SocketAddr, parent_addr: SocketAddr) -> ServerCore {
let cm = connection_manager::ConnectionManager::new(addr);
let server = ServerCore {
inner: Arc::new(
Mutex::new(
ChildServerCore {
server_state: 1,
addr: addr,
parent_addr: parent_addr,
cm: cm
}
)
)
};
println!("Server IP address is set to ...{}", addr.ip());
return server;
}
}
impl ChildServerCore {
pub fn start(&mut self) {
self.server_state = 2;
self.cm.start();
}
pub fn join_network(&mut self) -> Result<(), failure::Error> {
let parent_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(000, 0, 0, 0)), 000);
if self.parent_addr != parent_addr {
self.server_state = 3;
self.cm.join_network(self.parent_addr).unwrap();
} else {
println!("This server is running as Genesis Core Node ...");
}
return Ok(())
}
pub fn get_my_current_state(&self) -> u8 {
return self.server_state;
}
pub fn shutdown(&mut self) {
println!("Shutdown server...");
// self.server_state = 4;
// self.cm.connection_close().unwrap();
}
}
connection_manager.rs
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream, SocketAddr, IpAddr, Ipv4Addr, Shutdown};
use std::{str, thread, time::Duration};
use serde_json::{Value};
use std::sync::{Arc, Mutex};
use crossbeam;
use super::message;
// const PROTOCOL_NAME: &'static str = "bthereum";
// const VERSION: &'static str = "0.1.0";
// const MSG_ADD: u8 = 1;
// const MSG_REMOVE: u8 = 2;
// const MSG_CORE_LIST: u8 = 3;
// const MSG_REQUEST_CORE_LIST: u8 = 4;
// const MSG_PING: u8 = 5;
// const MSG_ADD_EDGE: u8 = 6;
// const MSG_REMOVE_EDGE: u8 = 7;
// const NULL = 8;
// const ERR_PROTOCOL_UNMATCH: u8 = 1;
// const ERR_VERSION_UNMATCH: u8 = 2;
// const OK_WITH_PAYLOAD: u8 = 3;
// const OK_WITHOUT_PAYLOAD: u8 = 4;
#[derive(Debug)]
pub struct ChildConnectionManager {
addr: SocketAddr,
parent_addr: SocketAddr,
core_node_set: Vec<SocketAddr>
}
#[derive(Debug)]
pub struct ConnectionManager {
inner: Arc<Mutex<ChildConnectionManager>>
}
impl ConnectionManager {
pub fn new(addr: SocketAddr) -> ConnectionManager {
println!("Initializing ConnectionManager...");
let parent_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(000, 0, 0, 0)), 000);
let manager = ConnectionManager {
inner: Arc::new(
Mutex::new(
ChildConnectionManager {
addr: addr,
parent_addr: parent_addr,
core_node_set: Vec::new()
}
)
)
};
let local_self = manager.inner.clone();
local_self.lock().unwrap()._add_peer(addr).unwrap();
return manager;
}
pub fn start(&mut self) {
self._wait_for_access().unwrap();
}
fn _wait_for_access(&mut self) -> Result<(), failure::Error> {
let _local_self = self.inner.clone();
let listener = TcpListener::bind(_local_self.lock().unwrap().addr).unwrap();
loop {
let local_self = self.inner.clone();
println!("Waiting for the connection...");
let (stream, addr) = listener.accept()?;
println!("Connected by... {}", addr);
let handle = thread::spawn(move|| {
local_self.lock().unwrap()._handle_message(stream, addr).unwrap();
});
handle.join().unwrap();
}
}
pub fn connection_close(&mut self) -> Result<(), failure::Error> {
let local_self = self.inner.clone();
let stream = TcpStream::connect(local_self.lock().unwrap().addr)?;
stream.shutdown(Shutdown::Both).expect("shutdown call failed");
let temp_vec: Vec<SocketAddr> = Vec::new();
// MSG_REMOVE
let msg = message::build(2, local_self.lock().unwrap().addr.port(), &temp_vec).unwrap();
let result = local_self.lock().unwrap().send_msg(local_self.lock().unwrap().parent_addr, &msg).unwrap();
if result != () {
local_self.lock().unwrap()._remove_peer(local_self.lock().unwrap().parent_addr).unwrap();
}
return Ok(());
}
pub fn join_network(&mut self, address: SocketAddr) -> Result<(), failure::Error> {
let local_self = self.inner.clone();
local_self.lock().unwrap().parent_addr = address;
self._connect_to_p2pnw(address).unwrap();
return Ok(());
}
fn _connect_to_p2pnw(&mut self, address: SocketAddr) -> Result<(), failure::Error> {
let local_self = self.inner.clone();
let mut stream = TcpStream::connect(address)?;
let temp_vec: Vec<SocketAddr> = Vec::new();
// MSG_ADD
let msg = message::build(1, local_self.lock().unwrap().addr.port(), &temp_vec).unwrap();
let string: String = msg.to_string();
// ここでエラー
stream.write_all(string.as_bytes())?;
stream.shutdown(Shutdown::Both).expect("shutdown call failed");
return Ok(());
}
}
impl ChildConnectionManager {
fn _handle_message(&mut self, mut stream: TcpStream, addr: SocketAddr) -> Result<(), failure::Error> {
let mut buffer = [0u8; 1024];
loop {
let nbytes = stream.read(&mut buffer)?;
if nbytes == 0 {
println!("Connection closed.");
return Ok(());
}
// print!("{}", str::from_utf8(&buffer[..nbytes])?);
let data: &str = str::from_utf8(&buffer[..nbytes])?;
let message: Value = serde_json::from_str(data).unwrap();
let (result, reason, cmd, peer_port, payload) = message::parse(&message);
// ホストとメッセージ受付用のポートをがっちゃんこ
let connect_addr = SocketAddr::new(addr.ip(), peer_port);
println!("result: {}, reason: {}, cmd: {}, peer_port: {}, payload: {:?}", result, reason, cmd, peer_port, payload);
let status = (result, reason);
// ERR_PROTOCOL_UNMATCH
if status == ("error".to_string(), 1) {
println!("Error: Protocol name is not matched");
return Ok(());
}
// ERR_VERSION_UNMATCH
else if status == ("error".to_string(), 2) {
println!("Error: Protocol version is not matched");
return Ok(());
}
// OK_WITHOUT_PAYLOAD
else if status == ("ok".to_string(), 4) {
// MSG_ADD
if cmd == 1 {
println!("ADD node request was received!!");
self._add_peer(connect_addr).unwrap();
if addr == self.addr {
return Ok(());
}
else {
// MSG_CORE_LIST
let msg = message::build(3, addr.port(), &self.core_node_set).unwrap();
self.send_msg_to_all_peers(&msg);
return Ok(())
}
}
// MSG_REMOVE
else if cmd == 2 {
println!("REMOVE node request was received!! from: {}", addr);
self._remove_peer(connect_addr).unwrap();
// MSG_CORE_LIST
let msg = message::build(3, addr.port(), &self.core_node_set).unwrap();
self.send_msg_to_all_peers(&msg);
return Ok(())
}
// MSG_PING
else if cmd == 5 {
return Ok(())
}
// MSG_REQUEST_CORE_LIST
else if cmd == 4 {
println!("List for Core nodes was requested!!");
// MSG_CORE_LIST
let msg = message::build(3, addr.port(), &self.core_node_set).unwrap();
let socket_address = SocketAddr::new(addr.ip(), peer_port);
let result = self.send_msg(socket_address, &msg).unwrap();
if result != () {
self._remove_peer(socket_address).unwrap();
}
return Ok(())
}
else {
println!("received unknown command: {}", cmd);
return Ok(());
}
}
// OK_WITH_PAYLOAD
else if status == ("ok".to_string(), 3) {
// MSG_CORE_LIST
if cmd == 3 {
// 受信したリストを上書きしてるのでセキュアではない
println!("Refresh the core node list...");
let new_core_set = payload;
println!("latest core node list:{:?}", new_core_set);
self.core_node_set = new_core_set;
return Ok(())
}
else {
println!("received unknown command: {}", cmd);
return Ok(())
}
}
else {
println!("Unexpected status: {}, {}", status.0, status.1);
}
}
}
fn _add_peer(&mut self, peer: SocketAddr) -> Result<(), failure::Error> {
println!("Adding peer: ({})", peer);
self.core_node_set.push(peer);
println!("Current Core List: {:?}", self.core_node_set);
return Ok(())
}
fn _remove_peer(&mut self, peer: SocketAddr) -> Result<(), failure::Error> {
if self.core_node_set.contains(&peer) {
for i in 0..self.core_node_set.len() {
if self.core_node_set[i] == peer {
println!("Removing peer: ({})", peer);
self.core_node_set.remove(i);
println!("Current Core List: {:?}", self.core_node_set);
}
}
}
return Ok(())
}
fn _check_peers_connection(&mut self) {
println!("check_peers_connection was called");
let mut changed: bool = false;
let mut dead_core_node_set: Vec<SocketAddr> = Vec::new();
for i in 0..self.core_node_set.len() {
let result = self._is_alive(self.core_node_set[i]).unwrap();
if result != true {
dead_core_node_set.push(self.core_node_set[i]);
self.core_node_set.remove(i);
}
}
if dead_core_node_set.len() != 0 {
changed = true;
println!("Removed {:#?}", dead_core_node_set);
}
println!("current core node list: {:#?}", self.core_node_set);
if changed {
// MSG_CORE_LIST
let msg = message::build(3, self.addr.port(), &self.core_node_set).unwrap();
self.send_msg_to_all_peers(&msg);
}
// 定期的に_check_peers_connectionを呼び出し
thread::sleep(Duration::from_secs(1800));
self._check_peers_connection();
}
fn _is_alive(&mut self, target: SocketAddr) -> Result<bool, failure::Error> {
let mut stream = TcpStream::connect(target)?;
// MSG_PING
let temp_port: u16 = 0;
let temp_vec: Vec<SocketAddr> = Vec::new();
let msg = message::build(5, temp_port, &temp_vec).unwrap();
let string: &str = msg.as_str().unwrap();
stream.write_all(string.as_bytes())?;
stream.shutdown(Shutdown::Both).expect("shutdown call failed");
return Ok(true);
}
fn send_msg_to_all_peers(&mut self, msg: &Value) {
println!("send_msg_to_all_peers was called!");
for i in 0..self.core_node_set.len() {
if self.core_node_set[i] != self.addr {
println!("message will be sent to ...{}", self.core_node_set[i]);
let result = self.send_msg(self.core_node_set[i], &msg).unwrap();
if result != () {
self._remove_peer(self.core_node_set[i]).unwrap();
}
}
}
}
fn send_msg(&mut self, socket_address: SocketAddr, msg: &Value) -> Result<(), failure::Error> {
let mut stream = TcpStream::connect(socket_address)?;
let string: String = msg.to_string();
stream.write_all(string.as_bytes())?;
stream.shutdown(Shutdown::Both).expect("shutdown call failed");
return Ok(())
}
}