多节点服务器定时任务重复处理的几种方案

1.使用zookeeper做分布式锁

@Component
public class MutexConfig {

	@Value(value = "${zookeeper.host}")
	private String zkHost;

	public Boolean getLock(String lock, Integer acquireTime, Integer sleepTime) throws Exception {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
		CuratorFramework client = CuratorFrameworkFactory.newClient(zkHost, retryPolicy);
		client.start();
		InterProcessMutex mutex = new InterProcessMutex(client, lock);
		Boolean result = mutex.acquire(acquireTime, TimeUnit.MILLISECONDS);
		Thread.sleep(sleepTime);
		if (mutex.isAcquiredInThisProcess()) {
			mutex.release();
		}
		client.close();
		return result;
	}
}

在配置文件里配置

zookeeper.path.smsxxx.xxtemplate=/dmp/smsxxx/xxtemplate

在定时任务实现类编写定时任务

@Value(value = "${zookeeper.path.smsxxx.xxtemplate}")
	private String jd;


/**
	 * 定时更新短信模板状态
	 */
	@Scheduled(cron = "0 0/10 * * * ?")
	public void updateJdState() {
		try {
			logger.info("updateJdState start");
			Boolean result = mutexConfig.getLock(jd, 2000, 3000);
			if (Boolean.TRUE.equals(result)) {
				logger.info("updateJdState content start");
				smsTemplateService.updateJdState();
			}
			logger.info("end");
		} catch (Exception e) {
			logger.error("updateJdState error: {}", e.getMessage());
		}
	}

2.使用mongodb做分布式锁

编写dao类

public interface DistributeLockerDAO extends BaseDAO<LockerPO> {

    /**
     * insert one locker
     * @param lockerPo
     * @return true if success, false if the locker is duplicated
     */
    boolean insert(LockerPO lockerPo);

    /**
     * the acquirer remove the locker with the id
     * @param id
     * @param acquirer
     * @return
     */
    boolean remove(String id, String acquirer);

    /**
     * update locker's time held by the acquirer
     * @param id
     * @param acquirer
     * @param time
     * @return
     */
    boolean updateShakenTime(String id, String acquirer, LocalDateTime time);

    /**
     * get the locker by the locker id
     * @param lockerId
     * @return
     */
    Optional<LockerPO> getLocker(String lockerId);

    /**
     * list the lockers
     * @param offset
     * @param limit
     * @return
     */
    List<LockerPO> list(long offset, int limit);


    /**
     * list the lockers with the acquirer
     * @param offset
     * @param limit
     * @param acquirer
     * @return
     */
    List<LockerPO> list(String acquirer, long offset, int limit);

    @Data
    @Accessors(chain = true)
    class LockerCountOfAcquirer {
        @Field("_id")
        String acquirer;
        long count;
    }
    /**
     * list the locker count group by the user
     * @return the map user->count pair list
     */
    List<LockerCountOfAcquirer> countByAcquirer();
}

锁的实体类

@Document(CollectionName.LOCKER)
@Data
@Accessors(chain = true)
public class LockerPO implements IdPO {
    @Id
    private String id;
    private String acquirer;
    private LocalDateTime acquiredAt;
    private LocalDateTime shakenAt;
}

锁的服务类

@Service
public class DistributeLockerServiceImpl implements DistributeLockerService {

    private final DistributeLockerDAO distributeLockerDAO;
    private final Gauge gauge;
    private final List<String> lastSyncAcquires;

    public DistributeLockerServiceImpl(
        @Value(value = "${prometheus.namespace}") String namespace,
        @Value(value = "${prometheus.subsystem}") String subsystem,
        DistributeLockerDAO distributeLockerDAO,
        MeterRegistry registry
    ) {
        this.distributeLockerDAO = distributeLockerDAO;
        lastSyncAcquires = new ArrayList<>();
        gauge = Gauge.build()
            .namespace(namespace)
            .subsystem(subsystem)
            .name("distribute_locker_total")
            .labelNames("user")
            .help("distribute locker statistic by the acquirer")
            .register(((PrometheusMeterRegistry)registry).getPrometheusRegistry());
    }

    @Override
    public boolean tryLock(String lockerId, String user) {

        LocalDateTime dateTime = LocalDateTime.now();
        boolean suc = distributeLockerDAO.insert(
            new LockerPO().setId(lockerId)
                .setAcquirer(user)
                .setAcquiredAt(dateTime)
                .setShakenAt(dateTime));

        if (!suc) {
            suc = distributeLockerDAO.updateShakenTime(lockerId, user, dateTime);
        }

        return suc;
    }

    @Override
    public boolean release(String lockerId, String user) {

        return distributeLockerDAO.remove(lockerId, user);
    }

    @Override
    public boolean shake(String lockerId, String user) {

        return distributeLockerDAO.updateShakenTime(lockerId, user, LocalDateTime.now());
    }

    @Override
    public Optional<LockerPO> getLocker(String lockerId) {

        return distributeLockerDAO.getLocker(lockerId);
    }

    @Override
    public List<LockerPO> list(long offset, int limit) {

        return distributeLockerDAO.list(offset, limit);
    }

    @Override public List<LockerPO> list(String user, long offset, int limit) {
        return distributeLockerDAO.list(user, offset, limit);
    }

    @Scheduled(fixedDelay = 500)
    public void monitorLockerCountOfAcquirer() {

        List<DistributeLockerDAO.LockerCountOfAcquirer> lockerCountOfAcquirers = distributeLockerDAO.countByAcquirer();
        updateMonitorOfLockerCountOfAcquirer(lockerCountOfAcquirers);

        List<String> acquires = lockerCountOfAcquirers.stream()
            .map(DistributeLockerDAO.LockerCountOfAcquirer::getAcquirer)
            .collect(Collectors.toList());

        resetLockerCountOfDeletedAcquire(acquires);
        updateLastSyncAcquires(acquires);
    }

    private void updateMonitorOfLockerCountOfAcquirer(List<DistributeLockerDAO.LockerCountOfAcquirer> lockerCountOfAcquirers) {
        lockerCountOfAcquirers.forEach(c -> gauge.labels(c.getAcquirer()).set(c.getCount()));
    }

    private void resetLockerCountOfDeletedAcquire(List<String> countOfUsers) {
        calcDeletedAcquirers(countOfUsers).forEach(a -> gauge.labels(a).set(0));
    }

    private List<String> calcDeletedAcquirers(List<String> acquirers) {
        return lastSyncAcquires.stream()
            .filter(s -> !acquirers.contains(s))
            .collect(Collectors.toList());
    }

    private void updateLastSyncAcquires(List<String> acquirers) {
        lastSyncAcquires.clear();
        lastSyncAcquires.addAll(acquirers);
    }
}

分布式锁的基类

@Slf4j
public class DistributeLockerBase implements LockerShakerScheduler {

    private final String acquirer;
    private final List<String> lockingIds;
    private final DistributeLockerService distributeLockerService;

    protected DistributeLockerBase(String acquirer,
        DistributeLockerService distributeLockerService) {

        Preconditions.checkArgument(!Strings.isNullOrEmpty(acquirer));
        Preconditions.checkArgument(distributeLockerService != null);

        this.acquirer = acquirer;
        this.lockingIds = Collections.synchronizedList(new ArrayList<>());
        this.distributeLockerService = distributeLockerService;
    }

    public final String getAcquirer() {
        return acquirer;
    }

    protected boolean lock(String lockerId) {

        boolean suc = distributeLockerService.tryLock(lockerId, acquirer);
        if (suc) {
            lockingIds.add(lockerId);
        }
        log.info("lock [{}], [{}]", lockerId, suc);
        return suc;
    }

    protected boolean release(String lockerId) {
        boolean suc = distributeLockerService.release(lockerId, acquirer);
        lockingIds.remove(lockerId);
        log.info("release [{}], [{}]", lockerId, suc);
        return suc;
    }

    /**
     * delay 1 second. after sync finished
     */
    @Scheduled(fixedDelay = 1000)
    @Override
    public void shake() {

        List<String> lockingIdsCopied;
        synchronized (this.lockingIds) {
            lockingIdsCopied = new ArrayList<>(this.lockingIds);
        }

        lockingIdsCopied.forEach(id -> {
            if (!distributeLockerService.shake(id, acquirer)) {
                log.error("[{}] shakes [{}] FAILED", acquirer, id);
            } else {
                log.info("[{}] shakes [{}] SUCCESS", acquirer, id);
            }
        });
    }
}


/**
 * the class used for defining the shake method
 * so that the <code>shake</code> method in the <code>DistributeLockerBase</code> can auto-scheduled
 * when some class inherits <code>DistributeLockerBase</code> and marks as a bean
 *
 * NOTE: it's dependent on the Spring boot version
 */
interface LockerShakerScheduler {
    void shake();
}

实现定时任务类

@Slf4j
@Component
public class BrandUVSyncher extends DistributeLockerBase {

     private static final String SPLITTER = "+";
    private static final String TASK_ID_PREFIX = "lbi-openapi-brandUV:";
    private static final long PERIOD = 10_000;

    private final MidPlatformClient midPlatformClient;
    private final BrandUVDAO brandUVDAO;
    private final AreaDAO areaDAO;
    private final List<AreaPO> top2LevelCities;
    /**
     * geo of banks in some city
     * cityName -> {bankName -> coordinate}
     */
    private final Map<String, Map<String, List<double[]>>> geoOfBanks;

    BrandUVSyncher(MidPlatformClient midPlatformClient, BrandUVDAO brandUVDAO,
        DistributeLockerService distributeLockerService, AreaDAO areaDAO) {
        super("BrandUVSyncher:" + UuidGenerator.newBase64Uuid(), distributeLockerService);
        this.midPlatformClient = midPlatformClient;
        this.brandUVDAO = brandUVDAO;
        this.areaDAO = areaDAO;
        this.geoOfBanks = loadGeoOfBanks();
        this.top2LevelCities = loadTop2LevelCity();
    }


      /**
     * sync the living&working uv of the cities of level 1&2
     */
    @Scheduled(fixedDelay = PERIOD, initialDelay = 3000)
    public void syncCityLivingAndWorkingUV() {

        log.info("run sync city WORKING&LIVING UV");

        // DO NOT MODIFY ME!
        final String lockerID = buildLockerID("syncCityLivingAndWorkingUV");

        LocalDate now = LocalDate.now();
        if (!isTimeToSync(now)) {
            return;
        }

        if (!lock(lockerID)) {
            log.error("locker [{}] failed", lockerID);
            return;
        }

        if (top2LevelCities.isEmpty()) {
            log.info("empty level 1 and level 2 cities");
            return;
        }

        LocalDate preMonth = now.minusMonths(1);
        LocalDate month = LocalDate.of(preMonth.getYear(), preMonth.getMonth(), 1);

        try {
            Stream.of(Brand.values()).forEach(
                b -> syncCityLivingAndWorkingUV(b.getCode(), month, top2LevelCities)
            );
        } finally {
            release(lockerID);
        }
    }


}

 

 

 

 

 

 

 

 

 

热门文章

暂无图片
编程学习 ·

C语言二分查找详解

二分查找是一种知名度很高的查找算法&#xff0c;在对有序数列进行查找时效率远高于传统的顺序查找。 下面这张动图对比了二者的效率差距。 二分查找的基本思想就是通过把目标数和当前数列的中间数进行比较&#xff0c;从而确定目标数是在中间数的左边还是右边&#xff0c;将查…
暂无图片
编程学习 ·

GMX 命令分类列表

建模和计算操作命令&#xff1a; 1.1 . 创建拓扑与坐标文件 gmx editconf - 编辑模拟盒子以及写入子组(subgroups) gmx protonate - 结构质子化 gmx x2top - 根据坐标生成原始拓扑文件 gmx solvate - 体系溶剂化 gmx insert-molecules - 将分子插入已有空位 gmx genconf - 增加…
暂无图片
编程学习 ·

一文高效回顾研究生课程《数值分析》重点

数值分析这门课的本质就是用离散的已知点去估计整体&#xff0c;就是由黑盒子产生的结果去估计这个黑盒子。在数学里这个黑盒子就是一个函数嘛&#xff0c;这门课会介绍许多方法去利用离散点最大化地逼近这个函数&#xff0c;甚至它的导数、积分&#xff0c;甚至微分方程的解。…
暂无图片
编程学习 ·

在职阿里5年,一个28岁女软测工程师的心声

简单的先说一下&#xff0c;坐标杭州&#xff0c;14届本科毕业&#xff0c;算上年前在阿里巴巴的面试&#xff0c;一共有面试了有6家公司&#xff08;因为不想请假&#xff0c;因此只是每个晚上去其他公司面试&#xff0c;所以面试的公司比较少&#xff09; ​ 编辑切换为居中…
暂无图片
编程学习 ·

字符串左旋c语言

目录 题目&#xff1a; 解题思路&#xff1a; 第一步&#xff1a; 第二步&#xff1a; 第三步&#xff1a; 总代码&#xff1a; 题目&#xff1a; 实现一个函数&#xff0c;可以左旋字符串中的k个字符。 例如&#xff1a; ABCD左旋一个字符得到BCDA ABCD左旋两个字符…
暂无图片
编程学习 ·

设计模式--观察者模式笔记

模式的定义与特点 观察者&#xff08;Observer&#xff09;模式的定义&#xff1a;指多个对象间存在一对多的依赖关系&#xff0c;当一个对象的状态发生改变时&#xff0c;所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布-订阅模式、模型-视图模式&#xf…
暂无图片
编程学习 ·

睡觉突然身体动不了,什么是睡眠痽痪症

很多朋友可能有这样的体验&#xff0c;睡觉过程中突然意识清醒&#xff0c;身体却动弹不了。这时候感觉非常恐怖&#xff0c;希望旁边有一个人推自己一下。阳光以前也经常会碰到这样的情况&#xff0c;一年有一百多次&#xff0c;那时候很害怕晚上到来&#xff0c;睡觉了就会出…
暂无图片
编程学习 ·

深入理解C++智能指针——浅析MSVC源码

文章目录unique_ptrshared_ptr 与 weak_ptrstd::bad_weak_ptr 异常std::enable_shared_from_thisunique_ptr unique_ptr 是一个只移型别&#xff08;move-only type&#xff0c;只移型别还有std::mutex等&#xff09;。 结合一下工厂模式&#xff0c;看看其基本用法&#xff…
暂无图片
编程学习 ·

@TableField(exist = false)

TableField(exist false) //申明此字段不在数据库存在&#xff0c;但代码中需要用到它&#xff0c;通知Mybatis-plus在做写库操作是忽略它。,.
暂无图片
编程学习 ·

Java Web day15

第十二章文件上传和下载 一、如何实现文件上传 要实现Web开发中的文件上传功能&#xff0c;通常需要完成两步操作&#xff1a;一.是在Web页面中添加上传输入项&#xff1b;二是在Servlet中读取上传文件的数据&#xff0c;并保存到本地硬盘中。 需要使用一个Apache组织提供一个…
暂无图片
编程学习 ·

【51nod 2478】【单调栈】【前缀和】小b接水

小b接水题目解题思路Code51nod 2478 小b接水 题目 输入样例 12 0 1 0 2 1 0 1 3 2 1 2 1输出样例 6解题思路 可以发现最后能拦住水的都是向两边递减高度&#xff08;&#xff1f;&#xff09; 不管两个高积木之间的的积木是怎样乱七八糟的高度&#xff0c;最后能用来装水的…
暂无图片
编程学习 ·

花了大半天写了一个UVC扩展单元调试工具

基于DIRECTSHOW 实现的&#xff0c;用的是MFC VS2019. 详见&#xff1a;http://www.usbzh.com/article/detail-761.html 获取方法 加QQ群:952873936&#xff0c;然后在群文件\USB调试工具&测试软件\UVCXU-V1.0(UVC扩展单元调试工具-USB中文网官方版).exe USB中文网 USB中文…
暂无图片
编程学习 ·

贪心(一):区间问题、Huffman树

区间问题 例题一&#xff1a;区间选点 给定 N 个闭区间 [ai,bi]请你在数轴上选择尽量少的点&#xff0c;使得每个区间内至少包含一个选出的点。 输出选择的点的最小数量。 位于区间端点上的点也算作区间内。 输入格式 第一行包含整数 N&#xff0c;表示区间数。 接下来 …
暂无图片
编程学习 ·

C语言练习实例——费氏数列

目录 题目 解法 输出结果 题目 Fibonacci为1200年代的欧洲数学家&#xff0c;在他的着作中曾经提到&#xff1a;「若有一只免子每个月生一只小免子&#xff0c;一个月后小免子也开始生产。起初只有一只免子&#xff0c;一个月后就有两只免子&#xff0c;二个月后有三只免子…
暂无图片
编程学习 ·

Android开发(2): Android 资源

个人笔记整理 Android 资源 Android中的资源&#xff0c;一般分为两类&#xff1a; 系统内置资源&#xff1a;Android SDK中所提供的已经定义好的资源&#xff0c;用户可以直接拿来使用。 用户自定义资源&#xff1a;用户自己定义或引入的&#xff0c;只适用于当前应用的资源…
暂无图片
编程学习 ·

零基础如何在短时间内拿到算法offer

​算法工程师是利用算法处理事物的职业 算法&#xff08;Algorithm&#xff09;是一系列解决问题的清晰指令&#xff0c;也就是说&#xff0c;能够对一定规范的输入&#xff0c;在有限时间内获得所要求的输出。 如果一个算法有缺陷&#xff0c;或不适合于某个问题&#xff0c;执…
暂无图片
编程学习 ·

人工智能:知识图谱实战总结

人工智能python&#xff0c;NLP&#xff0c;知识图谱&#xff0c;机器学习&#xff0c;深度学习人工智能&#xff1a;知识图谱实战前言一、实体建模工具Protegepython&#xff0c;NLP&#xff0c;知识图谱&#xff0c;机器学习&#xff0c;深度学习 人工智能&#xff1a;知识图…
暂无图片
编程学习 ·

【无标题】

这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注…