高并发多线程基础之线程间通信与数据共享及其应用(代码片段)

踩踩踩从踩 踩踩踩从踩     2022-12-31     651

关键词:

前言

本篇文章主要介绍的是java多线程之间如何通信,协同处理任务,以及数据共享,定时任务处理等操作。

多线程之间通信的方式

在实际开发过程中多个线程同时操作,有两种情况的,数据共享和线程间协作

数据共享的方式

  • 文件共享数据
通过文件达到多线程间数据共享

public static void main(String[] args) throws Exception 
		Path path = Paths.get("file.log");
		// 线程1 - 写入数据
		new Thread(() -> 
			while (true) 
				String content = "当前时间" + String.valueOf(System.currentTimeMillis());
				try 
					Files.write(path, content.getBytes());
					Thread.sleep(1000L);
				 catch (Exception e) 
					// TODO Auto-generated catch block
					e.printStackTrace();
				
			
		).start();
		// 线程2 - 读取数据
		new Thread(() -> 
			while (true) 
				try 
					byte[] allBytes = Files.readAllBytes(path);
					System.out.println(new String(allBytes));
					Thread.sleep(1000L);
				 catch (Exception e) 
					// TODO Auto-generated catch block
					e.printStackTrace();
				
			
		).start();
	
  • 网络共享,常见利用redis等分布式中间键,或者自己搭建的数据中间仓库
  • 共享变量 ,全局变量,堆内存中的变量

数据共享会带来很多问题,涉及到协作,这个是分不开的,这个在后面继续描述

线程间协作

jdk 提供的线程协调 API ,例如: suspend/resume wait/notify park/unpark
多线程协作的典型场景:生产者-消费者 模型。(线程阻塞、线程唤醒)

suspend和resume

作用:调用suspend挂起目标线程,通过resume可以恢复线程执行。


    public void test1_normal() throws Exception 
        Thread consumerThread = new Thread(new Runnable() 
            @Override
            public void run() 
                if (iceCream == null) 
                    System.out.println("等待...");
                    Thread.currentThread().suspend();
                
                System.out.println("完成");
            
        );
        consumerThread.start();

        Thread.sleep(3000L);    
        iceCream = new Object();      

        System.out.println("通知");
        consumerThread.resume();    //通知结束等待

    

suspend和resume 在jdk中被抛弃,主要原因 

  • 如果在suspend线程等待的时候,添加了synchnized关键字对象给锁住,就没办法进行解锁了。
 synchronized (Demo7_SuspendResume.class)
                        Thread.currentThread().suspend();
                    

  • resume在suspend之前 ,唤醒在等待之前也会导致死锁的情况


    public void test1_normal() throws Exception 
        Thread consumerThread = new Thread(new Runnable() 
            @Override
            public void run() 
                if (iceCream == null) 
                    Thread.sleep(7000L);    
                    System.out.println("等待...");
                    Thread.currentThread().suspend();
                
                System.out.println("完成");
            
        );
        consumerThread.start();

        Thread.sleep(3000L);    
        iceCream = new Object();      

        System.out.println("通知");
        consumerThread.resume();    //通知结束等待

    

 就会造成死锁的情况

wait和notify

wait方法导致当前线程等待,加入该对象的等待集合中,并且放弃当前持有的对象锁。
notify/notifyAll方法唤醒一个或所有正在等待这个对象锁的线程。
wait的时候会自动释放锁,也就是说不会造成死锁的情况
注意1:虽然会wait自动解锁,但是对顺序有要求, 如果在notify被调用之后,才开始wait方法
的调用,线程会永远处于WAITING状态。
注意2:这些方法只能由同一对象锁的持有者线程调用,也就是写在同步块里面,否则会抛出
IllegalMonitorStateException异常。
public void test1_normal() throws Exception 
		// 开启一个线程
		new Thread(new Runnable() 
			@Override
			public void run() 
				while (iceCream == null)  //
					synchronized (this) 
						System.out.println("拿到锁。。。");
						try 
							System.out.println("等待...");
							this.wait();
						 catch (InterruptedException e) 
							e.printStackTrace();
						
					
				
				System.out.println("完成");
			
		).start();

		Thread.sleep(3000L); // 3秒之后
		iceCream = new Object(); //

		synchronized (this) 
			System.out.println("拿到锁。。。");
			this.notifyAll();
			System.out.println("通知");
		
	

 wait和notify是对象的属性,当然也包括 线程对象

synchronized ("synchronized") 
			System.out.println("拿到锁。。。");
			this.notifyAll();
			System.out.println("通知");
		

如果把锁改一下,不是一把锁,执行notifyAll时,则会抛出异常,因为释放不掉锁

拿到锁。。。
等待...
拿到锁。。。
Exception in thread "main" java.lang.IllegalMonitorStateException
	at java.lang.Object.notifyAll(Native Method)
	at org.cao.thread.wait.Demo8_WaitNotify.test1_normal(Demo8_WaitNotify.java:62)
	at org.cao.thread.wait.Demo8_WaitNotify.main(Demo8_WaitNotify.java:8)

会出现死锁的情况,由于如果先唤醒,在锁住,就会出现死锁的情况;

public void test1_normal() throws Exception 
		// 开启一个线程
		new Thread(new Runnable() 
			@Override
			public void run() 
				while (iceCream == null)  //
					synchronized (this) 
						System.out.println("拿到锁。。。");
						try 
							Thread.sleep(3000L); // 
							System.out.println("等待...");
							this.wait();
						 catch (InterruptedException e) 
							e.printStackTrace();
						
					
				
				System.out.println("完成");
			
		).start();

		Thread.sleep(7000L); // 
		iceCream = new Object(); //

		synchronized (this) 
			System.out.println("拿到锁。。。");
			this.notifyAll();
			System.out.println("通知");
		
	

 

park和unpark机制

线程调用 park 则等待“许可” , unpark 方法为指定线程提供“许可 (permit)”
  • 调用unpark之后,再调用park,线程会直接运行
  • 提前调用的unpark不叠加,连续多次调用unpark后,第一次调用park后会拿到“许可”直接运行,后续调用会进入等待。
  • 一次许可,只能解决一次等待,在一次等待,则需要再一次许可

public void test1_normal() throws Exception 
		Thread consumerThread = new Thread(new Runnable() 
			@Override
			public void run() 
				while (iceCream == null) 

					System.out.println("等待...");
					LockSupport.park();
				
				System.out.println("完成");
			
		);
		consumerThread.start();

		Thread.sleep(3000L); // 3秒之后
		iceCream = new Object(); //

		LockSupport.unpark(consumerThread); //

		System.out.println("通知");
	

这里一定要将线程对象传给unpark,只能唤醒一个线程。

会存在死锁的情况

  • 在同步代码块中使用park/unpark,容易出现死锁
public void test2_DeadLock() throws Exception 
		Thread consumerThread = new Thread(new Runnable() 
			@Override
			public void run() 
				if (iceCream == null)  // 
					System.out.println("等待...");

					synchronized (this)  // 若拿到锁
						LockSupport.park(); // 执行park
					
				
				System.out.println("完成");
			
		);
		consumerThread.start();

		Thread.sleep(3000L); // 3秒之后
		iceCream = new Object(); // 

		synchronized (this)  // 争取到锁以后,才能恢复consumerThread
			LockSupport.unpark(consumerThread);
		
		System.out.println("通知");
	

主要看一下 park和unpark的源代码里面都是 使用的unsafe cas 自旋锁保证线程数据的安全。

总的来说

suspend/resume:加锁会出现死锁的情况,先唤醒后挂起也会出现死锁的情况,因此被jdk弃用
wait/notify:必须加锁,只用于 synchronized 关键字,如果是锁对象不同,则释放锁时,会抛异常,先唤醒后挂起也会出现死锁
park/unpark:添加synchronized也会出现死锁,先唤醒后挂起不会出现死锁,使用的unsafe cas 自旋锁保证线程数据的安全

伪唤醒

应该在循环中检查等待条件,原因是处于等待状态的线程可能会收到错误警报和伪唤醒,如果不在循环中检查等待条件,程序就会在没有满足结束条件的情况下退出。
伪唤醒是指线程并非因为notify、notifyall、unpark等api调用而意外唤醒,是更底层原因
导致的。
while(iceCream == null)  // 
					System.out.println("等待...");

					synchronized (this)  // 若拿到锁
						LockSupport.park(); // 执行park
					
				

也就是cpu自己断了,底层异常情况,只能通过while来保证数据的安全性

线程通信机制的应用

可以利用通信机制实现一个简单的锁

首先实现lock初始化的方法

public class MyLock implements Lock 

	@Override
	public void lock() 
		// TODO Auto-generated method stub

	

	@Override
	public void lockInterruptibly() throws InterruptedException 
		// TODO Auto-generated method stub

	

	@Override
	public boolean tryLock() 
		// TODO Auto-generated method stub
		return false;
	

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
		// TODO Auto-generated method stub
		return false;
	

	@Override
	public void unlock() 
		// TODO Auto-generated method stub

	

	@Override
	public Condition newCondition() 
		// TODO Auto-generated method stub
		return null;
	

  • 用全局变量存储获取的线程 ,首先这个是线程安全的全局变量;
	private AtomicReference<Thread> owner = new AtomicReference<Thread>();
  • 并且锁占用是,线程被挂起,挂起的线程的引用被放到waiters队列  这里要达到多个线程拿锁
private BlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();

 

  • 判断当前线程是否可以加锁,实现trylock方法
	@Override
	public boolean tryLock() 
		return owner.compareAndSet(null, Thread.currentThread());
	
  • 去抢锁,然后用到线程通信机制,就使用park,实现lock方法
	@Override
	public void lock() 
		while (!tryLock()) 
			Thread curTh = Thread.currentThread(); // 获取当前线程引用
			waiters.offer(curTh);
			LockSupport.park();
		
	
  • 然后实现释放锁的方法,实现unlock方法
@Override
	public void unlock() 
		if (owner.get() == Thread.currentThread()) 
			owner.set(null); // 将onwer置为null,释放锁
			Thread th = waiters.poll(); // 取出队列头部的元素,并移除该元素
			LockSupport.unpark(th); // 唤醒队列头部的元素
		
	

这里是不可重入的锁

多线程应用场景

  场景一:批量处理任务

  • 向大量(100w以上)的用户发送邮件
  • 处理大批量文件
  • 例如我在工作中需要按地区 或者 按设备类型 或者按用户等的维度去统计数据,而一个地区的网关设备可能有几百万个,并且维度很多,所以一定要多线程,多任务去处理
场景2:实现异步
  •  快速响应用户,入浏览器请求网页、图片时
  • 自动作业处理
场景3:增大吞吐量 ,例如 tomcat 、数据库 等服务

结语

整篇文章主要围绕着线程间通信,和数据共享的,依据线程间通信实现了一个简单的锁,在实际开发中直接用park和unpark,很少用到;至少我在开发很少用到,都是直接jdk提供的工具,但了解线程间通信是很有必要的

java多线程并发09——如何实现线程间与线程内数据共享

本文将为各位带来Java阻塞队列相关只是。关注我的公众号「Java面典」了解更多Java相关知识点。线程间数据共享Java里面进行多线程通信的主要方式就是共享内存的方式,共享内存主要的关注点有两个:可见性和有序性原子性。Ja... 查看详情

高并发编程学习——线程通信详解(代码片段)

为获得良好的阅读体验,请访问原文:传送门前序文章高并发编程学习(1)——并发基础-https://www.wmyskxz.com/2019/11/26/gao-bing-fa-bian-cheng-xue-xi-1-bing-fa-ji-chu/一、经典的生产者消费者案例上一篇文章我们提到一个应用可以创建多个线程去... 查看详情

并发编程之进程与线程

并发编程之进程与线程2.1线程与进程2.1.1进程2.1.2线程2.1.3二者对比2.2并行与并发2.3应用 2.1线程与进程2.1.1进程程序指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至CPU,数据加载至内存。在指令运行过... 查看详情

java多线程之synchronized及其优化

...对应的是jdk层面的J.U.C提供的基于AbstractQueuedSynchronizer的并发组件。synchronized提供的是互斥同步,互斥同步是指在多个线程并发访问共享数据时,保证共享数据在同一时刻只有一个线程访问。在jvm中,被synchronized修饰的代码块经ja... 查看详情

尚硅谷juc高并发编程学习笔记线程通信与集合线程安全(代码片段)

一、线程间通信线程间通信的模型有两种:共享内存和消息传递线程间的通信具体步骤:(涉及上中下部)1、创建资源类,在资源类中船舰属性和操作方法2、在资源类操作方法:判断、操作、通知3、创建... 查看详情

java多线程与并发(案例+应用)(代码片段)

...个线程之间共享数据的方式8.Java原子性类的应用9.Java线程并发库的应用(线程池)10.Callable与future的应用11.线程锁的技术12.Java读写锁技术的使用13.Java条件阻塞condition的应用14 查看详情

linux设备驱动基础01之并发与竞态(代码片段)

...中要注意对共享资源的保护,需要管理对共享资源的并发访问。Linux系统产生并发访问的几个主要原因:(1)、多线程并发访问,Linux是多任务系统,在应用程序中多线程访问是最基本的原因。(2)、抢占式并发访问&#x... 查看详情

啃碎并发:内存模型之基础概述

前言在并发编程中,需要解决两个关键问题:线程之间如何通信;线程之间如何同步; 线程通信是指线程之间以何种机制来交换信息。在命令式编程中,线程之间的通信机制有两种:共享内存和消息传递。在共享内存的并发... 查看详情

高并发多线程基础之线程池(代码片段)

高并发多线程之线程基础中生命周期、线程封闭、cpu缓存前言本篇文章描述我们jdk给我们提供的线程池;了解为什么使用线程池,有哪些优点,以及几种Executors中提供给我们的工厂创建方法等线程池的原理为什么要使... 查看详情

java并发编程之线程安全线程通信

Java多线程开发中最重要的一点就是线程安全的实现了。所谓Java线程安全,可以简单理解为当多个线程访问同一个共享资源时产生的数据不一致问题。为此,Java提供了一系列方法来解决线程安全问题。synchronizedsynchronized用于同步... 查看详情

java多线程与并发:内存模型

前言在并发变成中,我们需要关注两个问题:线程之间如何通信。线程之间如何同步。线程之间通信指的是线程之间如何交换信息。线程之间的通信机制有两种:共享内存和消息传递。在共享内存的并发模型里,线程之间共享程... 查看详情

初识多线程之基础知识与常用方法

1.线程与进程的描述: 1.1进程:每个进程都有独立的代码和数据空间(进程上下文),进程间的切换会有较大的开销,一个进程包含1~n个线程。(进程是资源分配的最小单位)  1.2线程:同一类线程共享代码和数据空间,每... 查看详情

等待与唤醒机制(线程之间的通信)(代码片段)

...程的任务)却不相同。为什么要处理线程间通信多个线程并发执行时,在默认情况下CPU是随机切换线程的,当我们需要多个线程来共同完成一件任务,并且我们希望他们有规律的执行,那么多线程之间需要一些协调通信,以此来帮... 查看详情

等待与唤醒机制

...是处理的动作却不相同。为什么处理线程间通信?多线程并发执行时,在默认情况下CPU是随机切换线程的,当我们需要多线程来共同完成一件任务,并且我们希望他们有规律的执行,那么多线程之间需要一些协调通信,以此来帮... 查看详情

java基础教程:多线程基础——线程间的通信

Java基础教程:多线程基础(2)——线程间的通信使线程间进行通信后,系统之间的交互性会更强大,在大大提高CPU利用率的同时还会使程序员对各线程任务在处理的过程中进行有效的把控与监督。线程间的通信思维导图 等... 查看详情

java并发编程线程简介(进程与线程|并发概念|线程间通信|java并发3特性)(代码片段)

文章目录一、进程与线程二、并发三、线程间通信四、Java并发3特性一、进程与线程最开始是没有线程这个概念的,一个应用程序就是一个进程,应用程序运行时,如果还要处理与用户交互的逻辑,二者只能交替进行,这样CPU执行效率... 查看详情

高并发多线程之线程基础中生命周期线程封闭cpu缓存(代码片段)

...的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个进程包括由操作系统分配的内存空间,包含一个或多个线程。一个线程不能独立的存在,它必须是进程的一... 查看详情

c++并发编程之二在线程间共享数据(代码片段)

文章目录1.1互斥锁(mutex)保护共享数据1.1.1std::mutex的成员函数std::mutex::lock()和std::mutex::unlock()(不推荐使用)1.1.2使用std::lock_guard保护共享数据1.1.3使用std::unique_lock保护共享数据1.2保护共享数据的其他方式1.2.1初始化过程中... 查看详情