当前位置:网站首页>TCP solves the problem of short write
TCP solves the problem of short write
2022-07-26 08:53:00 【CAir2】
short write Only non blocking mode exists
What is a buffer ?
Reference resources Animation illustration socket Things about the buffer
What is the situation that will produce short write?
When the size of the remaining space in the transmission buffer is insufficient to accommodate the size of the transmission data , At this time, only part of the data will be sent , And generate error code
EAGAIN
, At this point, there is short write The phenomenon , At this time, the remaining data should be sent again after the buffer space is enough .
send/write: Returns the number of bytes that have been sent ,errno by EAGAIN.
How to solve short write?( in the light of EPOLL Model LT Pattern )
Method 1 : take socket Set to blocking mode .
Method 2 : Maintain your own send buffer , adopt EPOLLONESHOT
and EPOLLOUT
Event call send/write send data .
thought :
- encapsulation
do_send
function , Internally implement a ring buffer , When the buffer space is insufficient to hold the sent data, it returns false. Return when there is enough space true, And registerEPOLLONESHOT|EPOLLOUT
event ,NOTE
: Here, it may be sent in any thread , So at this time, you need to pay attention to handling eventsEPOLLOUT
Thread security . - Handle
EPOLLOUT
event ( Need thread synchronization ), Get data from the ring buffer and pass send/wtire send out , If the transmission is successful, the ring buffer space will be released , If it fails and the error code isEAGAIN
orEINTER
Retry . Remove when the ring buffer data is emptyEPOLLOUT
event ( avoid cpu Caused by idling cpu waste ). Remember to reset after handling the eventEPOLLONESHOT
And recoverEPOLLIN
, Because indo_send
Time to cancelEPOLLIN
SocketContext.h
#pragma once
#include<mutex>
#include<string.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#define MAX_CIRCLE_BUF (10240)//10k
class CircleBuffer{
public:
CircleBuffer(){
m_data_size = 0;
m_data_start_index = 0;
}
bool append(char *buffer,int size)
{
std::lock_guard<std::mutex> lock(m_mtx);
if(size + m_data_size > MAX_CIRCLE_BUF)
{
return false;
}
int data_end_index = (m_data_start_index + m_data_size)%MAX_CIRCLE_BUF;
if(data_end_index + size <= MAX_CIRCLE_BUF)
{
memcpy(m_buf + data_end_index,buffer,size);
m_data_size += size;
return true;
}
else
{
memcpy(m_buf + data_end_index,buffer,MAX_CIRCLE_BUF - data_end_index);
int rest_len = size - (MAX_CIRCLE_BUF - data_end_index);
memcpy(m_buf,buffer + (MAX_CIRCLE_BUF - data_end_index),rest_len);
m_data_size += size;
return true;
}
}
void *get_buffer(int &size)
{
std::lock_guard<std::mutex> lock(m_mtx);
int tid =::syscall(SYS_gettid);
if(m_data_start_index + m_data_size <= MAX_CIRCLE_BUF)
{
size = m_data_size;
return m_buf + m_data_start_index;
}
else
{
size = MAX_CIRCLE_BUF - m_data_start_index;
return m_buf + m_data_start_index;
}
}
void free_buf(int size)
{
std::lock_guard<std::mutex> lock(m_mtx);
if (m_data_size < size)
{
throw std::runtime_error("circular buffer error");
}
m_data_size -= size;
m_data_start_index = (m_data_start_index + size)%MAX_CIRCLE_BUF;
}
bool is_empty()
{
std::lock_guard<std::mutex> lock(m_mtx);
return m_data_size <= 0;
}
private:
std::mutex m_mtx;
char m_buf[MAX_CIRCLE_BUF];
int m_data_size;
int m_data_start_index;
};
class SocketContext{
public:
SocketContext(int pollid,int sockid,bool is_listen_socket=false)
{
m_is_listen_socket = is_listen_socket;
m_epoll_fd = pollid;
m_sock_fd = sockid;
addfd();
}
virtual ~SocketContext()
{
removefd();
close(m_sock_fd);
m_epoll_fd = -1;
m_sock_fd = -1;
}
bool async_send(char* data,int size)
{
bool result = m_buf.append((char*)data,size);
// Maybe in any thread , Register when sending EPOLLOUT Cancel EPOLLIN, So we need to make sure that EPOLLOUT Thread safety for handling events
resetOneshot(false,true);
return result;
}
std::shared_ptr<SocketContext> do_accept()
{
struct sockaddr_in client_addr = {
0};
socklen_t addr_len = sizeof(client_addr);
int client_sock = accept(m_sock_fd,(struct sockaddr *)&client_addr,&addr_len);
resetOneshot();
if(client_sock == -1)
{
return nullptr;
}
return std::shared_ptr<SocketContext>(new SocketContext(m_epoll_fd,client_sock));
}
int do_recv()
{
char szBuffer[1024] = "";
int count = read(m_sock_fd,szBuffer,1024);
if(count <= 0)
{
return count;
}
resetOneshot();
printf("socket[%d] recv data:%s\r\n",m_sock_fd,szBuffer);
// This part is the test code
std::string strData = "hellow word abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVW0123456789 qqqqqqq";
async_send((char *)strData.data(),strData.size());
return count;
}
int do_send()
{
//keep thread safe
std::unique_lock<std::mutex> lk(m_send_mtx,std::try_to_lock);
if(!lk.owns_lock())
{
//do nothing
return 0;
}
int data_size = 0;
void *pdata = m_buf.get_buffer(data_size);
if(data_size > 0)
{
int send_size = send(m_sock_fd,pdata,data_size,0);
if(send_size > 0)
{
m_buf.free_buf(send_size);
}
else
{
printf("send failed:%d\r\n",errno);
}
// To restore EPOLLIN event
resetOneshot(true,true);
return send_size;
}
else
{
// To restore EPOLLIN event
resetOneshot(true,false);
return 0;
}
}
int epoll_fd()
{
return m_epoll_fd;
}
int sock_fd()
{
return m_sock_fd;
}
protected:
void setnonblocking()
{
int flag = fcntl(m_sock_fd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(m_sock_fd, F_SETFL, flag);
}
void addfd()
{
setnonblocking();
struct epoll_event event;
event.data.ptr = this;
if(m_is_listen_socket)
{
event.events = EPOLLIN|EPOLLONESHOT;
}
else
{
event.events = EPOLLIN|EPOLLONESHOT|EPOLLOUT;
}
if(-1 == epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD,m_sock_fd,&event))
{
printf("addfd error:%s\r\n", strerror(errno));
}
}
void removefd()
{
if(-1 == epoll_ctl(m_epoll_fd,EPOLL_CTL_DEL,m_sock_fd,NULL))
{
printf("removefd error:%s\r\n", strerror(errno));
}
}
void resetOneshot(bool in_event = true,bool out_event = true)
{
int tid =::syscall(SYS_gettid);
struct epoll_event event;
event.data.ptr = this;
event.events = EPOLLONESHOT;
if(m_is_listen_socket)
{
if(in_event)
{
event.events |= EPOLLIN;
}
}
else
{
if(in_event)
{
event.events |= EPOLLIN;
}
if(out_event)
{
event.events |= EPOLLOUT;
}
}
if(-1 == epoll_ctl(m_epoll_fd, EPOLL_CTL_MOD,m_sock_fd,&event))
{
printf("modifyfd error:%s\r\n", strerror(errno));
}
}
private:
bool m_is_listen_socket;
int m_sock_fd;
int m_epoll_fd;
std::mutex m_send_mtx;
CircleBuffer m_buf;
};
main.cpp
#include<stdio.h>
#include<iostream>
#include<sys/socket.h>
#include<fcntl.h>
#include<unistd.h>
#include<sys/epoll.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<errno.h>
#include<string.h>
#include<mutex>
#include<thread>
#include <sys/syscall.h>
#include <signal.h>
#include<map>
#include"SockContext.h"
std::map<int,std::shared_ptr<SocketContext> > g_cli_list;
int handle_accepter_event(int thread_id,int epoll_id,int listen_sock)
{
const int MAX_EVENTS = 10;
struct epoll_event events[MAX_EVENTS];
while (1)
{
int ret = epoll_wait(epoll_id,events,MAX_EVENTS,1000);
if(ret == -1)
{
if(errno == EINTR)
{
continue;
}
printf("[%d]epoll_wait error:%s\r\n",thread_id, strerror(errno));
return -1;
}
for (size_t i = 0; i < ret; i++)
{
SocketContext *pctx = (SocketContext *)events[i].data.ptr;
//listen_sock
if (pctx->sock_fd()== listen_sock)
{
auto client = pctx->do_accept();
if(client!= nullptr)
{
g_cli_list[client->sock_fd()] = client;
printf("socket[%d] connected\r\n",client->sock_fd());
}
else
{
printf("socket connected error\r\n");
}
}
else if(events[i].events & EPOLLIN)
{
int nread_count = pctx->do_recv();
if(nread_count <= 0)
{
printf("[%d]socket[%d] disconnected\r\n",thread_id, pctx->sock_fd());
g_cli_list.erase(pctx->sock_fd());
}
}
else if(events[i].events & EPOLLOUT)
{
pctx->do_send();
}
}
}
}
void handle_signal(int signal)
{
if(signal == SIGPIPE)
{
printf("recv sig pipe\r\n");
}
}
int main()
{
signal(SIGPIPE,handle_signal);
int listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if(listen_sock == -1)
{
printf("socket error:%s\r\n", strerror(errno));
return -1;
}
int reuse = 1;
setsockopt(listen_sock,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));
struct sockaddr_in ser_addr = {
0};
ser_addr.sin_family = AF_INET;
ser_addr.sin_port = htons(6360);
ser_addr.sin_addr.s_addr = INADDR_ANY;
if(-1 == bind(listen_sock, (struct sockaddr *)&ser_addr, sizeof(ser_addr)))
{
printf("bind socket error:%s\r\n", strerror(errno));
return -1;
}
if(-1 == listen(listen_sock,5))
{
printf("listen socket error:%s\r\n", strerror(errno));
return -1;
}
int epoll_id = epoll_create(5);
if(epoll_id == -1)
{
printf("epoll_create error:%s\r\n", strerror(errno));
return -1;
}
g_cli_list[listen_sock] = std::shared_ptr<SocketContext>(new SocketContext(epoll_id,listen_sock,true));
std::thread t(handle_accepter_event,1,epoll_id,listen_sock);
std::thread t2(handle_accepter_event,2,epoll_id,listen_sock);
t.join();
t2.join();
printf("hellow word\r\n");
return 0;
}
The above code , It's a perfect solution short write
problem , Because the ring buffer ensures the integrity of the transmitted data . However, in extreme cases, the above code may not be read . Because when sending data, cancel EPOOLIN
event , Next time EPOOLOUT
In the event , If in extreme cases , It may cause the write buffer to remain unwritten , This will not trigger EPOOLOUT
, Lead to EPOOLIN
The incident cannot be recovered . So you need to code Optimize
:
When sending data , If the ring buffer has no data , At this time, directly call the system send Send . If send Only part of the data was sent , Then add the remaining data to the buffer , By triggering
EPOOLOUT
To send data . This process needs to ensure thread safety , Prevent data confusion
bool async_send(char* data,int size)
{
// It needs to be locked , Prevent data from being disordered
std::lock_guard<std::mutex> lock(m_async_send_mtx);
int send_ok_bytes = 0;
// If the buffer is empty , No short write Resulting null data , Send directly , Otherwise, add buffer
if(m_buf.is_empty())
{
send_ok_bytes = send(m_sock_fd, data, size,MSG_DONTWAIT);
if(send_ok_bytes == size)
{
return true;
}
(send_ok_bytes >= 0) ? send_ok_bytes : send_ok_bytes = 0;
}
bool result = m_buf.append((char*)data + send_ok_bytes,size - send_ok_bytes);
resetOneshot(false,true);
return result;
}
This can greatly alleviate the above-mentioned extreme situation , But you can't 100% solve . If you want to 100% Solve it, then you can't cancel EPOOLIN
, At this point, we need to ensure that EPOOLIN
Thread security , At this point, you can refer to EPOOLOUT
Lock mode of , Deal with data security .
int do_recv()
{
//keep thread safe
std::unique_lock<std::mutex> lk(m_dorecv_mtx,std::try_to_lock);
if(!lk.owns_lock())
{
// Here you may wonder why not return 0, because read Back to 0 Express socket Disconnected , In order to avoid external misconception that the link is broken
// Because here is just demo, The number of bytes received is not counted externally , So return to 1 There is no problem
// For better use , It is suggested to modify the prototype
return 1;
}
char szBuffer[1024] = "";
int count = read(m_sock_fd,szBuffer,1024);
if(count <= 0)
{
return count;
}
resetOneshot();
printf("socket[%d] recv data:%s\r\n",m_sock_fd,szBuffer);
std::string strData = "hellow word abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVW0123456789 qqqqqqq";
int64_t total_send = 0;
while (async_send((char *)strData.data(),strData.size()))
{
total_send += strData.size();
printf("send total:%lld data[%d]\r\n",total_send,strData.size());
}
return count;
}
bool async_send(char* data,int size)
{
...
// At this time, it is modified as true
resetOneshot(true,true);
return result;
}
It's about short write
The resulting problems can be perfectly solved . But there is another problem at this time , Ring buffer size establishment . Because on the server , If it is too large, the memory will explode when the number of connections comes up . Generally, it is not recommended to be too large , Because when sending data is blocked , The data received by the terminal will also be lagging data , It may not be of great significance . It is generally recommended to average cache 5-10 The size of a bag is enough . But you can also use dynamic memory , No pre allocation . Dynamic memory is beyond the scope of this article .
边栏推荐
- Learning notes of automatic control principle --- linear discrete system
- Oracle 19C OCP 1z0-082 certification examination question bank (36-41)
- Which of count (*), count (primary key ID), count (field) and count (1) in MySQL is more efficient? "Suggested collection"
- Study notes of automatic control principle -- dynamic model of feedback control system
- Oracle 19C OCP 1z0-082 certification examination question bank (19-23)
- Web概述和B/S架构
- node-v下载与应用、ES6模块导入与导出
- 海内外媒体宣发自媒体发稿要严格把握内容关
- 12306 ticket system crawling - 1. Saving and reading of city code data
- 基于C语言设计的换乘指南打印系统
猜你喜欢
Memory management - dynamic partition allocation simulation
IC's first global hacking bonus is up to US $6million, helping developers venture into web 3!
The lessons of 2000. Web3 = the third industrial revolution?
利用模m的原根存在性判断以及求解
[encryption weekly] has the encryption market recovered? The cold winter still hasn't thawed out. Take stock of the major events that occurred in the encryption market last week
[abstract base class inheritance, DOM, event - learning summary]
海内外媒体宣发自媒体发稿要严格把握内容关
第6天总结&数据库作业
Learning notes of automatic control principle - Performance Analysis of continuous time system
JDBC数据库连接池(Druid技术)
随机推荐
SSH,NFS,FTP
Typescript encryption tool passwordencoder
Oracle 19C OCP 1z0-082 certification examination question bank (42-50)
Sklearn machine learning foundation (linear regression, under fitting, over fitting, ridge regression, model loading and saving)
Spark SQL common date functions
P1825 [USACO11OPEN]Corn Maze S
Replication of SQL injection vulnerability in the foreground of Pan micro e-cology8
Oracle 19C OCP certification examination software list
【FreeSwitch开发实践】使用SIP客户端Yate连接FreeSwitch进行VoIP通话
The lessons of 2000. Web3 = the third industrial revolution?
[recommended collection] summary of MySQL 30000 word essence - partitions, tables, databases and master-slave replication (V)
Oracle 19C OCP 1z0-082 certification examination question bank (51-60)
6、 Pinda general permission system__ pd-tools-log
Huffman transformation software based on C language
Cadence (x) wiring skills and precautions
Pxe原理和概念
Which of count (*), count (primary key ID), count (field) and count (1) in MySQL is more efficient? "Suggested collection"
Set of pl/sql -2
ES6模块化导入导出)(实现页面嵌套)
Transfer guide printing system based on C language design