mirror of
https://mirror.suhoan.cn/https://github.com/EasyTier/EasyTier.git
synced 2025-12-14 05:37:23 +08:00
fix ring buffer stuck when using multi thread runtime
This commit is contained in:
@@ -1,17 +1,15 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{Poll, Waker},
|
||||
fmt::Debug,
|
||||
sync::Arc,
|
||||
task::{ready, Poll},
|
||||
};
|
||||
|
||||
use atomicbox::AtomicOptionBox;
|
||||
use crossbeam_queue::ArrayQueue;
|
||||
use async_ringbuf::{traits::*, AsyncHeapCons, AsyncHeapProd, AsyncHeapRb};
|
||||
use crossbeam::atomic::AtomicCell;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{Sink, Stream};
|
||||
use futures::{Sink, SinkExt, Stream, StreamExt};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use tokio::sync::{
|
||||
@@ -30,83 +28,30 @@ use super::{
|
||||
|
||||
static RING_TUNNEL_CAP: usize = 128;
|
||||
|
||||
#[derive(Debug)]
|
||||
type RingLock = parking_lot::Mutex<()>;
|
||||
|
||||
type RingItem = SinkItem;
|
||||
|
||||
pub struct RingTunnel {
|
||||
id: Uuid,
|
||||
ring: ArrayQueue<SinkItem>,
|
||||
closed: AtomicBool,
|
||||
|
||||
wait_for_new_item: AtomicOptionBox<Waker>,
|
||||
wait_for_empty_slot: AtomicOptionBox<Waker>,
|
||||
ring_cons_impl: AtomicCell<Option<AsyncHeapCons<RingItem>>>,
|
||||
ring_prod_impl: AtomicCell<Option<AsyncHeapProd<RingItem>>>,
|
||||
}
|
||||
|
||||
impl RingTunnel {
|
||||
fn wait_for_new_item<T>(&self, cx: &mut std::task::Context<'_>) -> Poll<T> {
|
||||
let ret = self
|
||||
.wait_for_new_item
|
||||
.swap(Some(Box::new(cx.waker().clone())), Ordering::AcqRel);
|
||||
if let Some(old_waker) = ret {
|
||||
assert!(old_waker.will_wake(cx.waker()));
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
fn wait_for_empty_slot<T>(&self, cx: &mut std::task::Context<'_>) -> Poll<T> {
|
||||
let ret = self
|
||||
.wait_for_empty_slot
|
||||
.swap(Some(Box::new(cx.waker().clone())), Ordering::AcqRel);
|
||||
if let Some(old_waker) = ret {
|
||||
assert!(old_waker.will_wake(cx.waker()));
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
fn notify_new_item(&self) {
|
||||
if let Some(w) = self.wait_for_new_item.take(Ordering::AcqRel) {
|
||||
tracing::trace!(?self.id, "notify new item");
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_empty_slot(&self) {
|
||||
if let Some(w) = self.wait_for_empty_slot.take(Ordering::AcqRel) {
|
||||
tracing::trace!(?self.id, "notify empty slot");
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
|
||||
fn id(&self) -> &Uuid {
|
||||
&self.id
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.ring.len()
|
||||
}
|
||||
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.ring.capacity()
|
||||
}
|
||||
|
||||
fn close(&self) {
|
||||
tracing::info!("close ring tunnel {:?}", self.id);
|
||||
self.closed
|
||||
.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
self.notify_new_item();
|
||||
}
|
||||
|
||||
fn closed(&self) -> bool {
|
||||
self.closed.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn new(cap: usize) -> Self {
|
||||
let id = Uuid::new_v4();
|
||||
let ring_impl = AsyncHeapRb::new(cap);
|
||||
let (ring_prod_impl, ring_cons_impl) = ring_impl.split();
|
||||
Self {
|
||||
id: id.clone(),
|
||||
ring: ArrayQueue::new(cap),
|
||||
closed: AtomicBool::new(false),
|
||||
|
||||
wait_for_new_item: AtomicOptionBox::new(None),
|
||||
wait_for_empty_slot: AtomicOptionBox::new(None),
|
||||
ring_cons_impl: AtomicCell::new(Some(ring_cons_impl)),
|
||||
ring_prod_impl: AtomicCell::new(Some(ring_prod_impl)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,14 +62,23 @@ impl RingTunnel {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
impl Debug for RingTunnel {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RingTunnel").field("id", &self.id).finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RingStream {
|
||||
tunnel: Arc<RingTunnel>,
|
||||
id: Uuid,
|
||||
ring_cons_impl: AsyncHeapCons<RingItem>,
|
||||
}
|
||||
|
||||
impl RingStream {
|
||||
pub fn new(tunnel: Arc<RingTunnel>) -> Self {
|
||||
Self { tunnel }
|
||||
Self {
|
||||
id: tunnel.id.clone(),
|
||||
ring_cons_impl: tunnel.ring_cons_impl.take().unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,56 +89,39 @@ impl Stream for RingStream {
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
let s = self.get_mut();
|
||||
let ret = s.tunnel.ring.pop();
|
||||
let ret = ready!(self.get_mut().ring_cons_impl.poll_next_unpin(cx));
|
||||
match ret {
|
||||
Some(v) => {
|
||||
s.tunnel.notify_empty_slot();
|
||||
return Poll::Ready(Some(Ok(v)));
|
||||
}
|
||||
None => {
|
||||
if s.tunnel.closed() {
|
||||
tracing::warn!("ring recv tunnel {:?} closed", s.tunnel.id());
|
||||
return Poll::Ready(None);
|
||||
} else {
|
||||
tracing::trace!("waiting recv buffer, id: {}", s.tunnel.id());
|
||||
}
|
||||
s.tunnel.wait_for_new_item(cx)
|
||||
}
|
||||
Some(item) => Poll::Ready(Some(Ok(item))),
|
||||
None => Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RingSink {
|
||||
tunnel: Arc<RingTunnel>,
|
||||
impl Debug for RingStream {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RingStream")
|
||||
.field("id", &self.id)
|
||||
.field("len", &self.ring_cons_impl.base().occupied_len())
|
||||
.field("cap", &self.ring_cons_impl.base().capacity())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RingSink {
|
||||
fn drop(&mut self) {
|
||||
self.tunnel.close();
|
||||
}
|
||||
pub struct RingSink {
|
||||
id: Uuid,
|
||||
ring_prod_impl: AsyncHeapProd<RingItem>,
|
||||
}
|
||||
|
||||
impl RingSink {
|
||||
pub fn new(tunnel: Arc<RingTunnel>) -> Self {
|
||||
Self { tunnel }
|
||||
}
|
||||
|
||||
pub fn push_no_check(&self, item: SinkItem) -> Result<(), TunnelError> {
|
||||
if self.tunnel.closed() {
|
||||
return Err(TunnelError::Shutdown);
|
||||
Self {
|
||||
id: tunnel.id.clone(),
|
||||
ring_prod_impl: tunnel.ring_prod_impl.take().unwrap(),
|
||||
}
|
||||
|
||||
tracing::trace!(id=?self.tunnel.id(), ?item, "send buffer");
|
||||
let _ = self.tunnel.ring.push(item);
|
||||
self.tunnel.notify_new_item();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn has_empty_slot(&self) -> bool {
|
||||
self.tunnel.len() < self.tunnel.capacity()
|
||||
pub fn try_send(&mut self, item: RingItem) -> Result<(), RingItem> {
|
||||
self.ring_prod_impl.try_push(item)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,37 +132,41 @@ impl Sink<SinkItem> for RingSink {
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
let self_mut = self.get_mut();
|
||||
if !self_mut.has_empty_slot() {
|
||||
if self_mut.tunnel.closed() {
|
||||
return Poll::Ready(Err(TunnelError::Shutdown));
|
||||
}
|
||||
self_mut.tunnel.wait_for_empty_slot(cx)
|
||||
} else {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
let ret = ready!(self.get_mut().ring_prod_impl.poll_ready_unpin(cx));
|
||||
Poll::Ready(ret.map_err(|_| TunnelError::Shutdown))
|
||||
}
|
||||
|
||||
fn start_send(self: std::pin::Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
|
||||
self.push_no_check(item)
|
||||
self.get_mut()
|
||||
.ring_prod_impl
|
||||
.start_send_unpin(item)
|
||||
.map_err(|_| TunnelError::Shutdown)
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
if self.tunnel.closed() {
|
||||
return Poll::Ready(Err(TunnelError::Shutdown));
|
||||
}
|
||||
Poll::Ready(Ok(()))
|
||||
let ret = ready!(self.get_mut().ring_prod_impl.poll_flush_unpin(cx));
|
||||
Poll::Ready(ret.map_err(|_| TunnelError::Shutdown))
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.tunnel.close();
|
||||
Poll::Ready(Ok(()))
|
||||
let ret = ready!(self.get_mut().ring_prod_impl.poll_close_unpin(cx));
|
||||
Poll::Ready(ret.map_err(|_| TunnelError::Shutdown))
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for RingSink {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RingSink")
|
||||
.field("id", &self.id)
|
||||
.field("len", &self.ring_prod_impl.base().occupied_len())
|
||||
.field("cap", &self.ring_prod_impl.base().capacity())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user