NotificationScheduler.java
package no.nav.data.team.notify;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.DefaultLockingTaskExecutor;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import no.nav.data.common.auditing.domain.AuditVersion;
import no.nav.data.common.auditing.domain.AuditVersionRepository;
import no.nav.data.common.auditing.dto.AuditMetadata;
import no.nav.data.common.rest.PageParameters;
import no.nav.data.common.storage.StorageService;
import no.nav.data.common.storage.domain.GenericStorage;
import no.nav.data.common.utils.DateUtil;
import no.nav.data.team.notify.domain.*;
import no.nav.data.team.notify.domain.Notification.NotificationTime;
import no.nav.data.team.po.domain.ProductArea;
import no.nav.data.team.shared.domain.DomainObjectStatus;
import no.nav.data.team.shared.domain.Membered;
import no.nav.data.team.team.domain.Team;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static no.nav.data.common.utils.StreamUtils.*;
import static org.docx4j.com.google.common.math.IntMath.pow;
import static org.springframework.util.CollectionUtils.isEmpty;
@Slf4j
@Component
@ConditionalOnProperty(value = "team-catalog.envlevel", havingValue = "primary")
public class NotificationScheduler {
private LocalDateTime snooze = null;
private int snoozeTimes = 0;
private final NotificationRepository repository;
private final NotificationService service;
private final AuditVersionRepository auditVersionRepository;
private final StorageService storage;
private final AuditDiffService auditDiffService;
public NotificationScheduler(NotificationRepository repository, NotificationService service, AuditVersionRepository auditVersionRepository,
StorageService storage, AuditDiffService auditDiffService) {
this.repository = repository;
this.service = service;
this.auditVersionRepository = auditVersionRepository;
this.storage = storage;
this.auditDiffService = auditDiffService;
}
@Bean
public ApplicationRunner notifyInit(LockProvider lockProvider) {
return _ -> {
LockConfiguration config = new LockConfiguration(Instant.now(), "notifyInit", Duration.ofMinutes(1), Duration.ofMinutes(1));
new DefaultLockingTaskExecutor(lockProvider).executeWithLock((Runnable) this::doInit, config);
};
}
void doInit() {
// As we will not send notifications if no notifications have been sent yet, initialize them
var pageable = new PageParameters(0, 1).createSortedPageByFieldDescending(AuditVersion.Fields.time);
var audits = auditVersionRepository.findAll(pageable);
var lastAudit = audits.getTotalElements() > 0 ? audits.getContent().getFirst().getId() : null;
for (NotificationTime time : NotificationTime.values()) {
var state = getState(time);
if (state.getLastAuditNotified() == null && lastAudit != null) {
state.setLastAuditNotified(lastAudit);
state = storage.save(state);
log.info("initialized state {}", state);
}
}
}
// ┌───────────── second (0-59)
// │ ┌───────────── minute (0 - 59)
// │ │ ┌───────────── hour (0 - 23)
// │ │ │ ┌───────────── day of the month (1 - 31)
// │ │ │ │ ┌───────────── month (1 - 12) (or JAN-DEC)
// │ │ │ │ │ ┌───────────── day of the week (0 - 7)
// │ │ │ │ │ │ (or MON-SUN -- 0 or 7 is Sunday)
// │ │ │ │ │ │
// * * * * * *
@Scheduled(cron = "0 0 10 * * TUE") // Every TUE 10am
@SchedulerLock(name = "nudgeTime")
public void nudgeTime() {
storage.getAll(Team.class).stream()
.filter(t -> t.getStatus() == DomainObjectStatus.ACTIVE)
.forEach(this::timeBasedNudge);
storage.getAll(ProductArea.class).stream()
.filter(pa -> pa.getStatus() == DomainObjectStatus.ACTIVE)
.forEach(this::timeBasedNudge);
}
/**
* Nudge teams and product areas that have not been updated in a while
*/
private <T extends Membered> void timeBasedNudge(T object) {
var cutoff = LocalDateTime.now().minus(NotificationConstants.NUDGE_TIME_CUTOFF);
var lastModified = object.getChangeStamp().getLastModifiedDate();
var lastNudge = Optional.ofNullable(object.getLastNudge()).orElse(lastModified);
if (lastModified.isBefore(cutoff) && lastNudge.isBefore(cutoff)) {
service.nudge(object);
repository.updateNudge(object.getId(), LocalDateTime.now().toString());
}
}
@Scheduled(cron = "0 * * * * ?") // Every whole minute, if more than 5 errors, do snooze for 3+4^X minutes where X is times snoozed ,max 5
@SchedulerLock(name = "runNotifyTasks")
public void runNotifyTasks() {
Duration uptime = DateUtil.uptime();
if (uptime.minus(Duration.ofMinutes(2)).isNegative()) {
log.info("NotifyTasks - skip uptime {}", uptime);
return;
}
if (snooze != null && snooze.isAfter(LocalDateTime.now())) {
return;
}
snooze = null;
int maxErrors = 5;
var tasks = storage.getAll(NotificationTask.class);
log.info("NotifyTasks - running {} tasks", tasks.size());
var errors = 0;
for (var task : tasks) {
if (errors >= maxErrors) {
snoozeTimes = Math.max(snoozeTimes + 1, 5);
snooze = LocalDateTime.now().plusMinutes(3L + pow(4, snoozeTimes));
log.warn("NotifyTasks - Max Errors reached -> Snoozing until {}", snooze);
return;
}
try {
if (!service.notifyTask(task)) {
storage.delete(task.getId(), NotificationTask.class);
}
storage.delete(task);
snoozeTimes = 0;
} catch (MailNotFoundException e) {
log.warn("Email error on task id: %s".formatted(task.getId()), e);
} catch (Exception e) {
errors++;
log.error("Failed to notify task id: %s".formatted(task.getId()), e);
}
}
}
@Scheduled(cron = "30 * * * * ?") // Once every half past minute
@SchedulerLock(name = "allNotify")
public void allNotify() {
var uptime = DateUtil.uptime();
if (uptime.minus(Duration.ofMinutes(2)).isNegative()) {
log.info("ALL - Notification skip uptime {}", uptime);
return;
}
summary(NotificationTime.ALL);
}
@Scheduled(cron = "0 0 9 * * ?") // Every day at 9am
@SchedulerLock(name = "dailyNotify")
public void dailyNotify() {
summary(NotificationTime.DAILY);
}
@Scheduled(cron = "0 0 10 * * MON") // Every Monday at 10am
@SchedulerLock(name = "weeklyNotify")
public void weeklyNotify() {
summary(NotificationTime.WEEKLY);
}
@Scheduled(cron = "0 0 11 1 * ?") // First day of every month at 11 am
@SchedulerLock(name = "monthlyNotify")
public void monthlyNotify() {
summary(NotificationTime.MONTHLY);
}
void summary(NotificationTime time) {
log.info("{} - Notification running", time);
var state = getState(time);
UUID lastAuditId = null;
if (state.getLastAuditNotified() != null) {
var audits = union(
auditVersionRepository.getAllMetadataAfter(state.getLastAuditNotified()),
isEmpty(state.getSkipped()) ? List.of() : auditVersionRepository.getMetadataByIds(state.getSkipped())
);
audits.sort(Comparator.comparing(AuditMetadata::getTime));
if (time == NotificationTime.ALL) {
// Skip objects that have been edited very recently
LocalDateTime cutoff = LocalDateTime.now().minusMinutes(3);
var recents = filter(audits, a -> a.getTime().isAfter(cutoff)).stream().map(AuditMetadata::getTableId).distinct().toList();
var removed = filter(audits, a -> recents.contains(a.getTableId()));
audits.removeIf(removed::contains);
if (!removed.isEmpty()) {
log.info("Skipping {}", toString(removed));
}
state.setSkipped(convert(removed, AuditMetadata::getTableId));
}
if (audits.isEmpty()) {
log.info("{} - Notification end - no new audits", time);
return;
}
var notifications = GenericStorage.to(repository.findByTime(time), Notification.class);
var lastAudit = audits.getLast();
lastAuditId = lastAudit.getId();
var tasksForIdent = auditDiffService.createTask(audits, notifications);
tasksForIdent.forEach(storage::save);
}
state.setLastAuditNotified(lastAuditId);
storage.save(state);
log.info("{} - Notification end at {}", time, lastAuditId);
}
private NotificationState getState(NotificationTime time) {
return storage.getAll(NotificationState.class).stream()
.filter(n -> n.getTime() == time).findFirst()
.orElse(NotificationState.builder().time(time).build());
}
private String toString(List<? extends AuditMetadata> auditMetadatas) {
return convert(auditMetadatas, a ->
"{tableName=" + a.getTableName() +
" tableId=" + a.getTableId() +
(a instanceof TeamAuditMetadata amp ? "paId=" + amp.getProductAreaId() + "}" : "}")
).toString();
}
}