bump version to v2.1.1 (#533)

This commit is contained in:
Sijie.Sun
2024-12-24 10:40:57 -05:00
committed by GitHub
parent 2f4a097787
commit 34e4e907a9
12 changed files with 55 additions and 30 deletions

View File

@@ -38,6 +38,33 @@ impl<L: TunnelListener + 'static> StandAloneServer<L> {
&self.registry
}
async fn serve_loop(
listener: &mut L,
inflight: Arc<AtomicU32>,
registry: Arc<ServiceRegistry>,
tasks: Arc<Mutex<JoinSet<()>>>,
) -> Result<(), Error> {
listener
.listen()
.await
.with_context(|| "failed to listen")?;
loop {
let tunnel = listener.accept().await?;
let registry = registry.clone();
let inflight_server = inflight.clone();
inflight_server.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tasks.lock().unwrap().spawn(async move {
let server =
BidirectRpcManager::new().set_rx_timeout(Some(Duration::from_secs(60)));
server.rpc_server().registry().replace_registry(&registry);
server.run_with_tunnel(tunnel);
server.wait().await;
inflight_server.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
});
}
}
pub async fn serve(&mut self) -> Result<(), Error> {
let tasks = self.tasks.clone();
let mut listener = self.listener.take().unwrap();
@@ -45,28 +72,24 @@ impl<L: TunnelListener + 'static> StandAloneServer<L> {
join_joinset_background(tasks.clone(), "standalone server tasks".to_string());
listener
.listen()
.await
.with_context(|| "failed to listen")?;
let inflight_server = self.inflight_server.clone();
self.tasks.lock().unwrap().spawn(async move {
while let Ok(tunnel) = listener.accept().await {
let registry = registry.clone();
let inflight_server = inflight_server.clone();
inflight_server.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tasks.lock().unwrap().spawn(async move {
let server =
BidirectRpcManager::new().set_rx_timeout(Some(Duration::from_secs(60)));
server.rpc_server().registry().replace_registry(&registry);
server.run_with_tunnel(tunnel);
server.wait().await;
inflight_server.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
});
loop {
let ret = Self::serve_loop(
&mut listener,
inflight_server.clone(),
registry.clone(),
tasks.clone(),
)
.await;
if let Err(e) = ret {
tracing::error!(?e, url = ?listener.local_url(), "serve_loop exit unexpectedly");
println!("standalone serve_loop exit unexpectedly: {:?}", e);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
panic!("standalone server listener exit");
});
Ok(())

View File

@@ -287,6 +287,8 @@ async fn standalone_rpc_test() {
server.registry().register(service, "test");
server.serve().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut client = StandAloneClient::new(TcpTunnelConnector::new(
"tcp://127.0.0.1:33455".parse().unwrap(),
));

View File

@@ -33,6 +33,7 @@ impl TcpTunnelListener {
#[async_trait]
impl TunnelListener for TcpTunnelListener {
async fn listen(&mut self) -> Result<(), TunnelError> {
self.listener = None;
let addr = check_scheme_and_get_socket_addr::<SocketAddr>(&self.addr, "tcp")?;
let socket2_socket = socket2::Socket::new(