NotificationScheduler.java

  1. package no.nav.data.team.notify;

  2. import lombok.extern.slf4j.Slf4j;
  3. import net.javacrumbs.shedlock.core.DefaultLockingTaskExecutor;
  4. import net.javacrumbs.shedlock.core.LockConfiguration;
  5. import net.javacrumbs.shedlock.core.LockProvider;
  6. import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
  7. import no.nav.data.common.auditing.domain.AuditVersion;
  8. import no.nav.data.common.auditing.domain.AuditVersionRepository;
  9. import no.nav.data.common.auditing.dto.AuditMetadata;
  10. import no.nav.data.common.rest.PageParameters;
  11. import no.nav.data.common.storage.StorageService;
  12. import no.nav.data.common.storage.domain.GenericStorage;
  13. import no.nav.data.common.utils.DateUtil;
  14. import no.nav.data.team.notify.domain.Notification;
  15. import no.nav.data.team.notify.domain.Notification.NotificationTime;
  16. import no.nav.data.team.notify.domain.NotificationRepository;
  17. import no.nav.data.team.notify.domain.NotificationState;
  18. import no.nav.data.team.notify.domain.NotificationTask;
  19. import no.nav.data.team.notify.domain.TeamAuditMetadata;
  20. import no.nav.data.team.po.domain.ProductArea;
  21. import no.nav.data.team.shared.domain.DomainObjectStatus;
  22. import no.nav.data.team.shared.domain.HistorizedDomainObject;
  23. import no.nav.data.team.shared.domain.Membered;
  24. import no.nav.data.team.team.domain.Team;
  25. import org.springframework.boot.ApplicationRunner;
  26. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  27. import org.springframework.context.annotation.Bean;
  28. import org.springframework.scheduling.annotation.Scheduled;
  29. import org.springframework.stereotype.Component;

  30. import java.time.Duration;
  31. import java.time.Instant;
  32. import java.time.LocalDateTime;
  33. import java.util.Comparator;
  34. import java.util.List;
  35. import java.util.Optional;
  36. import java.util.UUID;

  37. import static java.util.stream.Collectors.toList;
  38. import static no.nav.data.common.utils.StreamUtils.convert;
  39. import static no.nav.data.common.utils.StreamUtils.filter;
  40. import static no.nav.data.common.utils.StreamUtils.union;
  41. import static org.docx4j.com.google.common.math.IntMath.pow;
  42. import static org.springframework.util.CollectionUtils.isEmpty;

  43. @Slf4j
  44. @Component
  45. @ConditionalOnProperty(value = "team-catalog.envlevel", havingValue = "primary")
  46. public class NotificationScheduler {


  47.     private LocalDateTime snooze = null;
  48.     private int snoozeTimes = 0;

  49.     private final NotificationRepository repository;
  50.     private final NotificationService service;
  51.     private final AuditVersionRepository auditVersionRepository;
  52.     private final StorageService storage;
  53.     private final AuditDiffService auditDiffService;

  54.     public NotificationScheduler(NotificationRepository repository, NotificationService service, AuditVersionRepository auditVersionRepository,
  55.             StorageService storage, AuditDiffService auditDiffService) {
  56.         this.repository = repository;
  57.         this.service = service;
  58.         this.auditVersionRepository = auditVersionRepository;
  59.         this.storage = storage;
  60.         this.auditDiffService = auditDiffService;
  61.     }

  62.     @Bean
  63.     public ApplicationRunner notifyInit(LockProvider lockProvider) {
  64.         return args -> {
  65.             LockConfiguration config = new LockConfiguration(Instant.now(), "notifyInit", Duration.ofMinutes(1), Duration.ofMinutes(1));
  66.             new DefaultLockingTaskExecutor(lockProvider).executeWithLock((Runnable) this::doInit, config);
  67.         };
  68.     }

  69.     void doInit() {
  70.         // As we will not send notifications if no notifications have been sent yet, initialize them
  71.         var pageable = new PageParameters(0, 1).createSortedPageByFieldDescending(AuditVersion.Fields.time);
  72.         var audits = auditVersionRepository.findAll(pageable);
  73.         var lastAudit = audits.getTotalElements() > 0 ? audits.getContent().get(0).getId() : null;

  74.         for (NotificationTime time : NotificationTime.values()) {
  75.             var state = getState(time);
  76.             if (state.getLastAuditNotified() == null && lastAudit != null) {
  77.                 state.setLastAuditNotified(lastAudit);
  78.                 state = storage.save(state);
  79.                 log.info("initialized state {}", state);
  80.             }
  81.         }
  82.     }

  83.     //                  ┌───────────── second (0-59)
  84.     //                  │ ┌───────────── minute (0 - 59)
  85.     //                  │ │  ┌───────────── hour (0 - 23)
  86.     //                  │ │  │ ┌───────────── day of the month (1 - 31)
  87.     //                  │ │  │ │ ┌───────────── month (1 - 12) (or JAN-DEC)
  88.     //                  │ │  │ │ │ ┌───────────── day of the week (0 - 7)
  89.     //                  │ │  │ │ │ │          (or MON-SUN -- 0 or 7 is Sunday)
  90.     //                  │ │  │ │ │ │
  91.     //                  * *  * * * *
  92.     @Scheduled(cron =  "0 0 10 * * TUE") // Every TUE 10am
  93.     @SchedulerLock(name = "nudgeTime")
  94.     public void nudgeTime() {
  95.         storage.getAll(Team.class).stream()
  96.                 .filter(t -> t.getStatus() == DomainObjectStatus.ACTIVE)
  97.                 .forEach(this::timeBasedNudge);

  98.         storage.getAll(ProductArea.class).stream()
  99.                 .filter(pa -> pa.getStatus() == DomainObjectStatus.ACTIVE)
  100.                 .forEach(this::timeBasedNudge);
  101.     }

  102.     /**
  103.      * Nudge teams and product areas that have not been updated in a while
  104.      */
  105.     private <T extends Membered> void timeBasedNudge(T object) {
  106.         var cutoff = LocalDateTime.now().minus(NotificationConstants.NUDGE_TIME_CUTOFF);
  107.         var lastModified = object.getChangeStamp().getLastModifiedDate();
  108.         var lastNudge = Optional.ofNullable(object.getLastNudge()).orElse(lastModified);
  109.         if (lastModified.isBefore(cutoff) && lastNudge.isBefore(cutoff)) {
  110.             service.nudge(object);
  111.             repository.updateNudge(object.getId(), LocalDateTime.now().toString());
  112.         }
  113.     }

  114.     @Scheduled(cron = "0 * * * * ?") // Every whole minute, if more than 5 errors, do snooze for 3+4^X minutes where X is times snoozed ,max 5
  115.     @SchedulerLock(name = "runNotifyTasks")
  116.     public void runNotifyTasks() {
  117.         Duration uptime = DateUtil.uptime();
  118.         if (uptime.minus(Duration.ofMinutes(2)).isNegative()) {
  119.             log.info("NotifyTasks - skip uptime {}", uptime);
  120.             return;
  121.         }

  122.         if (snooze != null && snooze.isAfter(LocalDateTime.now())) {
  123.             return;
  124.         }
  125.         snooze = null;
  126.         int maxErrors = 5;

  127.         var tasks = storage.getAll(NotificationTask.class);
  128.         log.info("NotifyTasks - running {} tasks", tasks.size());
  129.         var errors = 0;
  130.         for (var task : tasks) {
  131.             if (errors >= maxErrors) {
  132.                 snoozeTimes = Math.max(snoozeTimes + 1, 5);
  133.                 snooze = LocalDateTime.now().plusMinutes(3L + pow(4, snoozeTimes));
  134.                 log.warn("NotifyTasks - Max Errors reached -> Snoozing until {}", snooze);
  135.                 return;
  136.             }

  137.             try {
  138.                 service.notifyTask(task);
  139.                 storage.delete(task);

  140.                 snoozeTimes = 0;
  141.             } catch (MailNotFoundException e) {
  142.                 log.warn("Email error on task id: %s".formatted(task.getId()), e);
  143.             } catch (Exception e) {
  144.                 errors++;
  145.                 log.error("Failed to notify task id: %s".formatted(task.getId()), e);
  146.             }
  147.         }
  148.     }

  149.     @Scheduled(cron = "30 * * * * ?") // Once every half past minute
  150.     @SchedulerLock(name = "allNotify")
  151.     public void allNotify() {
  152.         var uptime = DateUtil.uptime();
  153.         if (uptime.minus(Duration.ofMinutes(2)).isNegative()) {
  154.             log.info("ALL - Notification skip uptime {}", uptime);
  155.             return;
  156.         }
  157.         summary(NotificationTime.ALL);
  158.     }

  159.     @Scheduled(cron = "0 0 9 * * ?") // Every day at 9am
  160.     @SchedulerLock(name = "dailyNotify")
  161.     public void dailyNotify() {
  162.         summary(NotificationTime.DAILY);
  163.     }

  164.     @Scheduled(cron = "0 0 10 * * MON") // Every Monday at 10am
  165.     @SchedulerLock(name = "weeklyNotify")
  166.     public void weeklyNotify() {
  167.         summary(NotificationTime.WEEKLY);
  168.     }

  169.     @Scheduled(cron = "0 0 11 1 * ?") // First day of every month at 11 am
  170.     @SchedulerLock(name = "monthlyNotify")
  171.     public void monthlyNotify() {
  172.         summary(NotificationTime.MONTHLY);
  173.     }

  174.     void summary(NotificationTime time) {
  175.         log.info("{} - Notification running", time);
  176.         var state = getState(time);
  177.         UUID lastAuditId = null;

  178.         if (state.getLastAuditNotified() != null) {
  179.             var audits = union(
  180.                     auditVersionRepository.getAllMetadataAfter(state.getLastAuditNotified()),
  181.                     isEmpty(state.getSkipped()) ? List.of() : auditVersionRepository.getMetadataByIds(state.getSkipped())
  182.             );
  183.             audits.sort(Comparator.comparing(AuditMetadata::getTime));

  184.             if (time == NotificationTime.ALL) {
  185.                 // Skip objects that have been edited very recently
  186.                 LocalDateTime cutoff = LocalDateTime.now().minusMinutes(3);
  187.                 var recents = filter(audits, a -> a.getTime().isAfter(cutoff)).stream().map(AuditMetadata::getTableId).distinct().collect(toList());
  188.                 var removed = filter(audits, a -> recents.contains(a.getTableId()));
  189.                 audits.removeIf(removed::contains);
  190.                 if (!removed.isEmpty()) {
  191.                     log.info("Skipping {}", toString(removed));
  192.                 }
  193.                 state.setSkipped(convert(removed, AuditMetadata::getTableId));
  194.             }

  195.             if (audits.isEmpty()) {
  196.                 log.info("{} - Notification end - no new audits", time);
  197.                 return;
  198.             }
  199.             var notifications = GenericStorage.to(repository.findByTime(time), Notification.class);
  200.             var lastAudit = audits.get(audits.size() - 1);
  201.             lastAuditId = lastAudit.getId();

  202.             var tasksForIdent = auditDiffService.createTask(audits, notifications);
  203.             tasksForIdent.forEach(storage::save);
  204.         }

  205.         state.setLastAuditNotified(lastAuditId);
  206.         storage.save(state);
  207.         log.info("{} - Notification end at {}", time, lastAuditId);
  208.     }


  209.     private NotificationState getState(NotificationTime time) {
  210.         return storage.getAll(NotificationState.class).stream()
  211.                 .filter(n -> n.getTime() == time).findFirst()
  212.                 .orElse(NotificationState.builder().time(time).build());
  213.     }

  214.     private String toString(List<? extends AuditMetadata> auditMetadatas) {
  215.         return convert(auditMetadatas, a ->
  216.                 "{tableName=" + a.getTableName() +
  217.                         " tableId=" + a.getTableId() +
  218.                         (a instanceof TeamAuditMetadata amp ? "paId=" + amp.getProductAreaId() + "}" : "}")
  219.         ).toString();
  220.     }


  221. }