分布式锁

1.什么是分布式锁?

  分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

  在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,这个时候,便需要使用到分布式锁。

2.为什么要使用分布式锁

  • 成员变量 A 存在 JVM1、JVM2、JVM3 三个 JVM 内存中
  • 成员变量 A 同时都会在 JVM 分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的
  • 不是同时发过来,三个请求分别操作三个不同 JVM 内存区域的数据,变量 A 之间不存在共享,也不具有可见性,处理的结果也是不对的

    注:该成员变量 A 是一个有状态的对象

      如果我们业务中确实存在这个场景的话,我们就需要一种方法解决这个问题,这就是分布式锁要解决的问题

分布式锁的具体实现方案有如下三种:

  1. 基于数据库实现;
  2. 基于缓存(Redis等)实现;
  3. 基于Zookeeper实现;

3.基于数据库实现

  • 锁设计:数据库中的表有主键,主键不能重复,如果插入一个重复的主键.,则数据库必然报错.

  • 数据库加锁: 向主键添加一个固定值 1;

  • 数据库解锁: 从表中删除主键固定值1;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.jt.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import com.jt.mapper.MysqlMapper;
@Service("mysqlLock")
public class MysqlLock implements Lock{

@Autowired
private MysqlMapper mapper;

//表示尝试加锁
@Override
public boolean tryLock() {
try {
mapper.insert(); //向数据库中插入记录 定值
} catch (Exception e) {
return false; //加锁失败
}
return true; //加锁成功!!!
}

//表示加锁机制
@Override
public void lock() {
if(tryLock()) {
//表示加锁成功
return;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock();

}

@Override
public void unlock() {

mapper.delete();
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub

}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}

@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.jt;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

//模拟窗口卖票业务
@SpringBootTest
@RunWith(SpringRunner.class)
public class MysqlLockTest{
private int num = 100;
@Autowired
@Qualifier("mysqlLock")
private Lock lock;

class window implements Runnable {
//定义线程卖票操作
@Override
public void run() {
while(true) {
try {
lock.lock();
if(num>0) {
System.out.println(Thread.currentThread().getName()+"卖出第"+(101-num)+"张票");
num--;
Thread.sleep(100);
}else {
break;//跳出循环
}

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}


@Test
public void test01() {
Runnable window = new window();
Thread thread1 = new Thread(window, "窗口A");
Thread thread2 = new Thread(window, "窗口B");
Thread thread3 = new Thread(window, "窗口C");
thread1.start();
thread2.start();
thread3.start();
for(;;); //防止线程提前结束
}

}

4.基于redis实现

Redis锁的设计理念:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.jt.lock;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import com.jt.mapper.MysqlMapper;
import com.jt.stream.LuaStream;

import redis.clients.jedis.JedisCluster;
@Component("redisLock")
public class RedisLock implements Lock{

@Autowired
private JedisCluster jedisCluster;
private ThreadLocal<String> thread = new ThreadLocal<>();
private static final String key = "JT_LOCK";

//表示尝试加锁
@Override
public boolean tryLock() {
try {
String value = UUID.randomUUID().toString();
String result = jedisCluster.set(key, value, "NX", "PX", 800);
if(result.equalsIgnoreCase("ok")) {
thread.set(value);
return true; //加锁成功!!!
}
} catch (Exception e) {
return false; //加锁失败
}
return false; //加锁失败
}

//表示加锁机制
@Override
public void lock() {
if(tryLock()) {
//表示加锁成功
return;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock();
}

@Override
public void unlock() {
String uuid = thread.get();
//所以使用lua脚本实现.保证数据的原子性操作
String script = LuaStream.getLuaFileString();
jedisCluster.eval(script, Arrays.asList(key), Arrays.asList(uuid));
}

@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub

}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}

@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}

lua脚本

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.jt;

import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.jt.thread.ThreadPool;

import redis.clients.jedis.JedisCluster;

//模拟窗口卖票业务
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedislLockTest{
private int num = 5000; //模拟5000张火车票
@Autowired
@Qualifier("redisLock")
private Lock lock;
@Autowired
private JedisCluster jedisCluster;

class window implements Runnable {
//定义线程卖票操作
@Override
public void run() {
while(true) {
lock.lock(); //加锁
if(num>0) {
System.out.println(Thread.currentThread().getName()+"售出第"+(5001-num)+"张票");
String ticket = jedisCluster.hget("windows",Thread.currentThread().getName());
jedisCluster.hset("windows",Thread.currentThread().getName(), ticket+","+(5001-num));
num--;
}else {
break;//跳出循环
}
lock.unlock(); //解锁
}
}
}


@Test
public void test01() {
Runnable window = new window();
//定义线程池 初始化10个线程
List<Thread> pools = ThreadPool.initPool(window,6);
for (Thread thread : pools) {
thread.start();
}
for(;;); //防止线程提前结束
}
}

5.其他锁例子

lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.jt;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.Test;

//模拟窗口卖票业务
public class LockTest{

private int num = 100;

//定义lock锁
private Lock lock = new ReentrantLock();

class Windows implements Runnable{
//定义线程卖票操作
@Override
public void run() {
while(true) {
lock.lock(); //为线程加锁
if(num>0) {
System.out.println("恭喜"+Thread.currentThread().getName()+"售出得第"+(101-num)+"张票");
num--;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
break;//跳出循环
}
lock.unlock(); //为线程减锁
}
}
}

@Test
public void test01() {
Runnable runnable = new Windows();
Thread thread1 = new Thread(runnable, "窗口A");
Thread thread2 = new Thread(runnable, "窗口B");
Thread thread3 = new Thread(runnable, "窗口C");
thread1.start();
thread2.start();
thread3.start();
for(;;); //防止线程提前结束
}

}

synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.jt;

import org.junit.Test;

//模拟窗口卖票业务
public class SyncTest{

private int num = 100;

class Windows implements Runnable{
//定义线程卖票操作
@Override
public void run() {
while(true) {
synchronized (this) {
if(num>0) {
System.out.println("恭喜"+Thread.currentThread().getName()+"售出得第"+(101-num)+"张票");
num--;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
break;//跳出循环
}
}
}
}
}

@Test
public void test01() {
Runnable runnable = new Windows();
Thread thread1 = new Thread(runnable, "窗口A");
Thread thread2 = new Thread(runnable, "窗口B");
Thread thread3 = new Thread(runnable, "窗口C");
thread1.start();
thread2.start();
thread3.start();
for(;;); //防止线程提前结束
}

}

thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.jt;

import org.junit.Test;

//模拟窗口卖票业务
public class TicketTest{

private int num = 100;

class Windows implements Runnable{
//定义线程卖票操作
@Override
public void run() {
while(true) {
if(num>0) {
System.out.println("恭喜"+Thread.currentThread().getName()+"售出得第"+(101-num)+"张票");
num--;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
break;//跳出循环
}
}

}
}

//模拟三个线程同时售卖100张车票
@Test
public void test01() {
Runnable runnable = new Windows();
Thread thread1 = new Thread(runnable, "窗口A");
Thread thread2 = new Thread(runnable, "窗口B");
Thread thread3 = new Thread(runnable, "窗口C");
thread1.start();
thread2.start();
thread3.start();
for(;;); //防止线程提前结束
}

}