/**
* 实现群聊
* 服务端向每个客户端一对一通信,将一个客户端的消息发送给其他所有人,客户端与客户端之间没有直接联系
*/
public class GroupChatServer {
public static String msg;
//Sender线程和Receiver线程的缓冲区,Receiver线程收到某个客户端的消息,放入 msg ,唤醒Sender线程群发消息
public static void main(String[] args) {
try(ServerSocket ss = new Server socket (8888)) {
System.out.println(“群聊服务器已启动,等待客户端连接”);
while (true){
Socket socket = ss.accept();
System.out.println(“有新的客户端连接: “+socket.getInetAddress());
new Receiver2(socket).start();
new Sender2( Socket ).start();
}
} catch (IO Exception e) {
throw new RuntimeException(e);
}
}
}
class Sender2 extends Thread{
//向客户端一对一发送消息
final Socket socket;
public Sender2(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {
while (true){
synchronized (“”){
//所有收发线程同步””
“”.wait();
//每次先等待,当有人说话了再唤醒,发送消息
}
if (socket.isClosed()||socket.isOutput Shutdown ())break;
//如果Receiver线程.readLine返回null即对方Socket对象已关闭,则关闭己方Socket对象,Sender线程检测到Socket关闭退出循环结束线程
pw.println(GroupChatServer.msg);
//服务器对每个客户端一对一发送,将这一个消息群发给所有客户端,因为只是读取msg而不会改变它所以不需要同步
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}finally {
if (socket != null) {
try {
socket.close();
//在Receiver线程也会finally socket.close() close方法可以多次执行,方法内判断如果isClosed直接return,不会产生异常
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
class Receiver2 extends Thread{
//与客户端一对一接收消息
final Socket socket;
public Receiver2(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try(BufferedReader br = new BufferedReader(new InputStream Reader(socket.getInputStream()))) {
synchronized (“”){
GroupChatServer.msg = “用户”+socket.getInetAddress()+”加入群聊”;
//每当有新用户加入时在群内提示
“”.notifyAll();
}
while (true){
String newMsg = br.readLine();
if (newMsg == null){
System.out.println(“用户”+socket.getInetAddress()+”结束通信”);
synchronized (“”){
GroupChatServer.msg = “用户”+socket.getInetAddress()+”退出群聊”;
“”.notifyAll();
}
break;
}
synchronized (“”){
//同步””每次只能一个Receiver线程存msg
GroupChatServer.msg = “用户”+socket.getInetAddress()+”说: “+newMsg;
“”.notifyAll();
//.notifyAll()唤醒所有””.wait()阻塞的Sender线程,每个Sender线程将msg发送给客户端后再次wait
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}finally {
if (socket!=null){
try {
socket.close();
//Receiver线程先发现用户结束通信,唤醒Sender线程后break执行close,这时Sender还在就绪状态排队,所以Receiver线程会更早关闭socket
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
class GCS2{
//解决:当多个Receiver同时收到客户端的消息,在Sender唤醒但还没有得到CPU资源或部分 线程 没得到CPU资源发送消息时,下一个Receiver将msg更改,部分用户漏收消息的可能性
static String msg;
static Socket microphone;
//缓存发送msg的Socket
final static ArrayList<Socket> group = new ArrayList<>();
//组员容器
final static ArrayList<PrintWriter> pws = new ArrayList<>();
//输出流容器
public static void add(Socket socket){
group.add(socket);
try {
pws.add(new PrintWriter(socket.getOutputStream(),true));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
try(ServerSocket ss = new ServerSocket(8888)) {
new Sender3().start();
//用一个Sender线程处理所有输出流,每次将msg发送给所有人之后再让Receiver传新的msg
while (true){
Socket socket = ss.accept();
add(socket);
new Receiver3(socket).start();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
class Receiver3 extends Thread{
final Socket socket;
public Receiver3(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”加入群聊”);
//push()方法用于更新msg,单独写方法体便于复用
while (true){
String nMsg = br.readLine();
if (nMsg == null) {
push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”退出群聊”);
break;
}
push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”说: “+ nMsg);
//客户端发送给服务端的信息已经被缓存在Socket中了,每次. readLine ()是读取本地缓存中的内容,也就是说即便Receiver线程因为排队更新msg而阻塞,也不会漏掉客户端的某段msg
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
private void push(String msg) throws InterruptedException {
synchronized (“r”){
if (GCS2.msg!=null){
“r”.wait();
//当Sender没有取走msg时.wait
}
GCS2.msg = msg;
GCS2.microphone = socket;
}
synchronized (“s”){
“s”.notify();
//唤醒Sender线程,Receiver线程在”r”.wait()阻塞,Sender线程在”s”.wait()阻塞,如果是同一个对象锁的话使用notify()无法保证唤醒的是Sender
}
}
}
class Sender3 extends Thread{
@Override
public void run() {
while (true){
synchronized (“s”){
if (GCS2.msg==null) {
try {
“s”.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
checkConnection();
//将已经关闭的Socket从group移除并关闭PW输出流
for (int i = 0;i<GCS2.group.size();i++){
if (GCS2.group.get(i)!=GCS2.microphone){
GCS2.pws.get(i).println(GCS2.msg);
}
}
synchronized (“r”){
GCS2.msg=null;
GCS2.microphone = null;
“r”.notify();
//唤醒一个因为”r”.wait()阻塞的Receiver线程更新信息
}
}
}
private void checkConnection(){
ArrayList<Integer> list = new ArrayList<>();
for (int i = GCS2.group.size()-1;i>=0;i–){
if (GCS2.group.get(i).isClosed()||GCS2.group.get(i).isOutputShutdown()){
list.add(i);
}
}
for (int i :
list) {
GCS2.group.remove(i);
GCS2.pws.get(i).close();
GCS2.pws.remove(i);
//从后往前删,每次删除不会影响下一个要删的元素的 索引
}
}
}
class GCS3{
final static String[] history = new String[99];
//使用数组缓存历史记录
static int cursor;
public static void push(String msg){
synchronized (history){
//存的时候串行
history[cursor++]=msg;
if (cursor==99){
cursor=0;
}
}
synchronized (“s”){
“s”.notifyAll();
}
}
public static void main(String[] args) {
try(ServerSocket ss = new ServerSocket(8888)) {
System.out.println(“服务器启动”);
while (true){
Socket socket = ss.accept();
new Receiver4(socket).start();
new Sender4(socket).start();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
class Sender4 extends Thread{
final Socket socket;
int index;
public Sender4(Socket socket) {
this.socket = socket;
this.index = GCS3.cursor;
}
@Override
public void run() {
try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {
while (true){
synchronized (“s”){
if (index==GCS3.cursor){
“s”.wait();
}
}
//取的时候并行
if (socket.isClosed()||socket.isOutputShutdown())break;
for (int c = GCS3.cursor;index!=c;index=(index+1)%99){
pw.println(GCS3.history[index]);
}
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
class Receiver4 extends Thread{
final Socket socket;
public Receiver4(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
GCS3.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”加入群聊”);
while (true){
String msg = br.readLine();
if (msg == null) {
GCS3.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”退出群聊”);
break;
}
GCS3.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”说: “+msg);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
class GCS5 {
static int total;
//当前通信的连接总数
static int count;
//对一段msg完成发送的Sender数
static String msg = “加入群聊”;
static Socket microphone;
public static void totalUp(){
synchronized(“total”){
total++;
}
}
public static void totalDown(){
synchronized (“total”){
total–;
//修改同一数据时需要串行
}
}
public static void countUp(){
synchronized (“count”){
count++;
}
}
public static void push(String s,Socket socket){
if (GCS5.count<GCS5.total) {
synchronized (“r”) {
try {
“r”.wait();
//每次放入新的消息前先等待,等着前一句全部发送完毕后被唤醒
boolean b;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
synchronized (“count”){
count=0;
msg = s;
microphone = socket;
}
synchronized (“s”){
“s”.notifyAll();
}
}
public static void main(String[] args) {
try(ServerSocket ss = new ServerSocket(8888)) {
System.out.println(“已启动”);
while (true){
Socket socket = ss.accept();
System.out.println(“新的连接”);
new Receiver5(socket).start();
new Sender5(socket).start();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
class Receiver5 extends Thread{
final Socket socket;
public Receiver5(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
GCS5.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”加入群聊”,socket);
while (true){
String msg = br.readLine();
if (msg == null) {
GCS5.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”退出群聊”,socket);
break;
}
GCS5.push(“用户”+socket.getInetAddress()+”:”+socket.getPort()+”说: “+msg,socket);
}
} catch (IOException e) {
throw new RuntimeException(e);
}finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
GCS5.totalDown();
}
}
class Sender5 extends Thread{
final Socket socket;
public Sender5(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try(PrintWriter pw = new PrintWriter(socket.getOutputStream(),true)) {
pw.println(“加入群聊”);
GCS5.totalUp();
//Sender线程运行后再total++,使创建到运行这期间count不会卡在total-1
while (!socket.isClosed()){
GCS5.countUp();
if (socket!=GCS5.microphone){
pw.println(GCS5.msg);
}
//先count++,如果Receiver线程正在更新msg,更新后count增加的1是新的msg的发送量
if (GCS5.count>=GCS5.total){
synchronized(“r”){
“r”.notify();
//当Sender都发送完之后唤醒一个.wait()的Receiver。但如果这时没有.wait()的Receiver,所有Sender全部.wait(),再收到新的msg时Receiver需要自己判断count>=total不执行.wait()
}
}
synchronized (“s”){
“s”.wait();
}
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}