随着 Solana 域名服务的发展,我们希望域名能够在第三方市场上轻松转售。为了实现这一目标,我们将域名作为 NFTs 通证化。这使我们的用户能够在二级市场上出售他们注册的域名,如 MagicEden 或 Hyperspace。这就产生了一个问题:各种平台都提供域名,这意味着用户很难发现他们可能感兴趣的域名。我们需要的是一个更大的综合图示,了解各个市场上的情况。
我们在不久前推出了 SNS-Optics 以解决这个问题。它整合在naming.bonfida.org主页面中,提供了所有交易 Solana 域名服务主要市场的综合情况,使用户能够快速看到一个域名是否可以出售以及在哪里出售。
运行这项服务需要从三个来源不断收集信息:MagicEden API、Hyperspace API 和 Solana 本身。起初,我们设计系统的方式是每个数据源都有自己的自定义收集器,很少有共享代码。这意味着维护、优化和增加新的功能被证明是相当费力的。我们选择了这种简单的解决方案,以尽可能快地推出 SNS-愿景,但这产生了大量的技术工作。这些服务本身是相当不稳定的,需要不断地监测和重新启动。
几个月后,我们终于抽出时间来彻底重写采集器的基础设施,有三个目标:
在新的收集器基础设施中最大限度地实现代码共享的关键,是将每个数据源的具体内容隔离在一个具有最小功能的对象中,但它实现了一个共同的特征。思考一下收集器的实际工作,它从 API 中检索原始信息,解析它,然后按顺序检索额外的信息,然后将结果数据提交到数据库。整个操作在一个循环中运行,每个迭代都有一个偏移量,偏移量可以是一个数字,甚至可以是一个事务签名。例如,看一个 Solana 程序的活动,我们会看交易并检索最近的交易,从已经在我们数据库中的最新交易开始。然后,我们从最近处理的交易开始,检索更多、更近的交易。最近的交易的签名是这里的偏移。一旦完成了这些,我们再开始循环。
首先要注意的是,我们实际上是在看非常不同种类的信息,这些信息是由非常不同的东西来索引的。交易是由签名索引的,而 MagicEden Api 中的列表是由一个整数索引的。在程序账户的情况下,甚至没有一个偏移量,因为我们只是在一个请求中检索我们感兴趣的全部账户。我们的软件架构必须要考虑到所有这些不同的情况。让我们试着旋风式地参观一下我们是如何构造我们的代码来同时处理每个数据源的。
所有不同的数据源都必须实现一个通用特性,我们称之为数据源特征。下面是一个精简版的特征定义。
#[async_trait]
pub trait DataSource<RawRecord> {
type RawOffset;
type StopOffset;
async fn get_batch(
&self,
offset: Option<Self::RawOffset>,
) -> Result<
(
Option<Self::RawOffset>,
Vec<WithOffset<RawRecord, Self::RawOffset>>,
),
crate::ErrorWithTrace,
>;
async fn process_raw_record(
&self,
record: RawRecord,
) -> Result<Vec<Record>, crate::ErrorWithTrace>;
}
这里的诀窍是将原始的批处理检索与所包含的记录的单独处理脱钩。这使我们能够设计一个通用的基础设施,处理数据源对象,而不必担心它们的具体细节。get_batch方法将处理对 Apis 或 Solana Rpc 节点的所有调用,而process_raw_record方法将处理每个记录。然后收集器本身使用tokio异步运行时尽可能地并行处理许多记录。
当不需要偏移逻辑时,我们可以将 RawOffse
类型定义为()
.
CollectorRunn
泛型 和 Collector
特征为了处理收集工作本身,我们使用了以下方法 CollectorRunner
:
pub struct CollectorRunner<RawRecord, T: DataSource<RawRecord>> {
// The data_source is in an `Arc` to facilitate spawning parallel `process_raw_record` and `get_batch` tasks
data_source: Arc<T>,
// A task feeds a queue `RawRecord` objects which can then be processed in parallel
raw_queue: SyncedReceiver<WithOffset<RawRecord, T::RawOffset>>,
..
}
现在,如果我们想在 Rust 中能够处理动态类型的对象,我们需要一个共同的特性,即Collector特性,它由我们的CollectorRunner实现:
#[async_trait]
pub trait Collector {
async fn get_next_records(&self) -> Result<Option<Vec<Record>>, crate::ErrorWithTrace>;
}
最后,我们可以使用相同的逻辑来处理所有不同类型的CollectorRunner实例,通过对它们进行封装,这是 Rust 的一个常见技巧,可以使用动态类型:Box,这意味着我们可以将所有的收集器聚集在一个Vec,并同时启动它们。
动态类型具有最小的运行时间成本,但对于性能关键的逻辑来说,它可以被迅速建立。在这种情况下,调用在堆栈中的位置很低,我们不应该担心这个问题。然而,作为一般的经验法则,我们应该总是批判性地思考动态类型是否是手头问题的合适解决方案。我们最不希望看到的是强迫 Rust 生成缓慢的代码!
让我们开始这段重构旅程的原因之一是,我们观察到我们以前的收集器实现是泄漏内存的。在运行几天后,主机实例会耗尽内存并锁定,需要重新启动整个系统。
作为一个临时的解决方案,我们强迫这个过程每天重启几次。这解决了不稳定的问题,但对我们来说并不太合适。我们调查了一段时间的内存泄漏问题,但没有找到根本原因。我们继续前行,希望在重构后再来解决这个问题。
我们的第一直觉是,通过重写整个业务逻辑,内存泄漏要么会消失,要么会更容易诊断,然后修复。但这种情况并没有发生,新版本继续泄漏内存,而且没有明显的原因。这给了我们一种直觉,也许问题来自于上游,在我们的一个依赖关系中。在做了一些研究之后,我们发现reqwest 创建在使用默认内存分配器时有一个已知的内存泄漏问题,并建议使用优秀的 jemalloc来代替它。有趣的是,jemalloc曾经是 Rust 的默认内存分配器,后来被 glibc 的malloc实现所取代,它唯一真正的优势是它可以从可用的系统库中动态链接,减少二进制的大小。
因此,我们建议在为 tokio 异步运行时编写时使用jemalloc,特别是在处理一个长期运行的进程时。这就像在你的Cargo.toml中添加这一行一样简单:
tikv-jemallocator ={ version = "0.5.0", features = ["unprefixed_malloc_on_supported_platforms"] }
这是你的main.rs
:
use tikv_jemallocator::Jemalloc;
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
jemalloc
不仅减少了内存泄漏的发生,而且还配备了一套强大的基准测试和自省工具。