1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use crate::{channel::channel_tick, SentryApi};
use adapter::{prelude::*, Adapter};
use primitives::Config;
use slog::{error, info, Logger};
use std::error::Error;
use futures::{
future::{join, join_all},
TryFutureExt,
};
use tokio::{runtime::Runtime, time::sleep};
#[derive(Debug, Clone)]
pub struct Worker<C: Unlocked> {
pub sentry: SentryApi<C, ()>,
pub config: Config,
pub adapter: Adapter<C, UnlockedState>,
pub logger: Logger,
}
impl<C: Unlocked + 'static> Worker<C> {
pub fn from_sentry(sentry: SentryApi<C, ()>) -> Self {
Self {
config: sentry.config.clone(),
adapter: sentry.adapter.clone(),
logger: sentry.logger.clone(),
sentry,
}
}
pub fn run(self, is_single_tick: bool) -> Result<(), Box<dyn Error>> {
let rt = Runtime::new()?;
if is_single_tick {
rt.block_on(self.all_channels_tick());
} else {
rt.block_on(self.infinite());
}
Ok(())
}
pub async fn infinite(&self) {
loop {
let wait_time_future = sleep(self.config.worker.wait_time);
let _result = join(self.all_channels_tick(), wait_time_future).await;
}
}
pub async fn all_channels_tick(&self) {
let logger = &self.logger;
let (channels_context, validators) = match self.sentry.collect_channels().await {
Ok(res) => res,
Err(err) => {
error!(logger, "Error collecting all channels for tick"; "collect_channels" => ?err, "main" => "all_channels_tick");
return;
}
};
let channels_size = channels_context.len();
let sentry_with_propagate = match self.sentry.clone().with_propagate(validators) {
Ok(sentry) => sentry,
Err(err) => {
error!(logger, "Failed to set propagation validators: {err}"; "err" => ?err, "main" => "all_channels_tick");
return;
}
};
let tick_results = join_all(channels_context.into_iter().map(|channel_context| {
let channel = channel_context.context;
channel_tick(&sentry_with_propagate, &self.config, channel_context)
.map_err(move |err| (channel, err))
}))
.await;
for (channel, channel_err) in tick_results.into_iter().filter_map(Result::err) {
error!(logger, "Error processing Channel"; "channel" => ?channel, "error" => ?channel_err, "main" => "all_channels_tick");
}
info!(logger, "Processed {} channels", channels_size);
if channels_size >= self.config.worker.max_channels as usize {
error!(logger, "WARNING: channel limit cfg.MAX_CHANNELS={} reached", &self.config.worker.max_channels; "main" => "all_channels_tick");
}
}
}