LINUX高性能服务器框架

1.服务器模型:

1). C/S模型

 :


  此模型就是一个服务器能给多个客户端提供服务,但所以资源都被服务端所占有,客户端想要获取只能通过请求连接服务端去获取。由于客户端的请求访问是异步的,所以需要一个手段进行此类事件的监听,比如SELECT或者是EPOLL,可以只监听服务器套接字,将客户端的客户需求交给子进程去处理,也可以进行同时监听,将连接对面的套接字加入,进行一起监听。

2).P2P模式

 此模型放弃了以服务器为中心的观点,改为每个电脑的地位相同,既可以作为客户端,也可以作为服务端。

 

2.服务器大致框架:

不同差别服务器,其差别一般都于逻辑处理,但大体框架是一样的,如下图:

3.io模型:

I/O模型一般有如下几种:

阻塞I/O:即当客户端发起连接以后,服务端成功连接,但是服务/客户端因为某些原因无法立刻执行,从而被操作系统挂起,直到有消息或者要处理时,才会从等待队列中唤醒。可能被阻塞的I/O有accept(),connect(),send(),recv()等等。

非阻塞I/O:此类不会一直等待,比如可以设置一段事件进行等待,事件发生就立即进行返回。

同步/异步 I/O

4:两种高效的I/O模型:

1.): reactor模型(同步模型):

如上图:就绪事件准备好之后插入就绪队列,然后唤醒请求队列进行不同的工作状态进行工作。

 

以同步I/O模拟proactor模式

注意如此,用同步I/O来模拟异步I/O时,发生读事件,将数据进行封装然后储存在队列中,然后注册写事件就绪,而写事件则是队列取出进行回馈返回。

5.高效的并发模式(主要运用线程池):
1).半同步半异步:

半同步半异步指的就是同步用来处理用户逻辑,异步用来处理I/O事件:

但是上述模型有缺点:

8-11改良后的半同步/半异步模式,每个工作线程都可以通过epoll_wait()。来监听多个SOCKET进行服务。

2).领导/追逐者模式:读者能力有限,请自行 查阅资料

6.优先状态机,用来处理单元之间的逻辑问题:比如先处理什么,再处理什么,接着处理什么。

想象一个如下有限状态机:

下面是利用上述原理进行的分析HTTP请求的状态机。

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>

#define BUFFER_SIZE 4096
enum CHECK_STATE { CHECK_STATE_REQUESTLINE = 0, CHECK_STATE_HEADER, CHECK_STATE_CONTENT };
enum LINE_STATUS { LINE_OK = 0, LINE_BAD, LINE_OPEN };
enum HTTP_CODE { NO_REQUEST, GET_REQUEST, BAD_REQUEST, FORBIDDEN_REQUEST, INTERNAL_ERROR, CLOSED_CONNECTION };
static const char* szret[] = { "I get a correct result\n", "Something wrong\n" };

LINE_STATUS parse_line( char* buffer, int& checked_index, int& read_index )
{
    char temp;
    for ( ; checked_index < read_index; ++checked_index )
    {
        temp = buffer[ checked_index ];
        if ( temp == '\r' )
        {
            if ( ( checked_index + 1 ) == read_index )//对面传来一行数据 这“一行”数据 可能是属于HTTP头部的一部分 还没到"\n",这里判断是read_index是消息行末尾的后一个字节,checked_index已经读到对面发过来末尾了
            {
                return LINE_OPEN;
            }
            else if ( buffer[ checked_index + 1 ] == '\n' )
            {
                buffer[ checked_index++ ] = '\0';
                buffer[ checked_index++ ] = '\0';
                return LINE_OK;
            }
            return LINE_BAD;
        }
        else if( temp == '\n' )
        {
            if( ( checked_index > 1 ) &&  buffer[ checked_index - 1 ] == '\r' )
            {
                buffer[ checked_index-1 ] = '\0';
                buffer[ checked_index++ ] = '\0';
                return LINE_OK;
            }
            return LINE_BAD;
        }
    }
    return LINE_OPEN;
}

HTTP_CODE parse_requestline( char* szTemp, CHECK_STATE& checkstate )
{
    char* szURL = strpbrk( szTemp, " \t" );
    if ( ! szURL )
    {
        return BAD_REQUEST;
    }
    *szURL++ = '\0';

    char* szMethod = szTemp;
    if ( strcasecmp( szMethod, "GET" ) == 0 )
    {
        printf( "The request method is GET\n" );
    }
    else
    {
        return BAD_REQUEST;
    }

    szURL += strspn( szURL, " \t" );
    char* szVersion = strpbrk( szURL, " \t" );
    if ( ! szVersion )
    {
        return BAD_REQUEST;
    }
    *szVersion++ = '\0';
    szVersion += strspn( szVersion, " \t" );
    if ( strcasecmp( szVersion, "HTTP/1.1" ) != 0 )
    {
        return BAD_REQUEST;
    }

    if ( strncasecmp( szURL, "http://", 7 ) == 0 )
    {
        szURL += 7;
        szURL = strchr( szURL, '/' );
    }

    if ( ! szURL || szURL[ 0 ] != '/' )
    {
        return BAD_REQUEST;
    }

    //URLDecode( szURL );
    printf( "The request URL is: %s\n", szURL );
    checkstate = CHECK_STATE_HEADER;
    return NO_REQUEST;
}

HTTP_CODE parse_headers( char* szTemp )
{
    if ( szTemp[ 0 ] == '\0' )
    {
        return GET_REQUEST;
    }
    else if ( strncasecmp( szTemp, "Host:", 5 ) == 0 )
    {
        szTemp += 5;
        szTemp += strspn( szTemp, " \t" );
        printf( "the request host is: %s\n", szTemp );
    }
    else
    {
        printf( "I can not handle this header\n" );
    }

    return NO_REQUEST;
}

HTTP_CODE parse_content( char* buffer, int& checked_index, CHECK_STATE& checkstate, int& read_index, int& start_line )
{
    LINE_STATUS linestatus = LINE_OK;
    HTTP_CODE retcode = NO_REQUEST;
    while( ( linestatus = parse_line( buffer, checked_index, read_index ) ) == LINE_OK )
    {
        char* szTemp = buffer + start_line;
        start_line = checked_index;//连续两次++ 
        switch ( checkstate )
        {
            case CHECK_STATE_REQUESTLINE:
            {
                retcode = parse_requestline( szTemp, checkstate );
                if ( retcode == BAD_REQUEST )
                {
                    return BAD_REQUEST;
                }
                break;
            }
            case CHECK_STATE_HEADER:
            {
                retcode = parse_headers( szTemp );
                if ( retcode == BAD_REQUEST )
                {
                    return BAD_REQUEST;
                }
                else if ( retcode == GET_REQUEST )
                {
                    return GET_REQUEST;
                }
                break;
            }
            default:
            {
                return INTERNAL_ERROR;
            }
        }
    }
    if( linestatus == LINE_OPEN )
    {
        return NO_REQUEST;
    }
    else
    {
        return BAD_REQUEST;
    }
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );
    
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );
    
    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );
    
    int ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );
    
    ret = listen( listenfd, 5 );
    assert( ret != -1 );
    
    struct sockaddr_in client_address;
    socklen_t client_addrlength = sizeof( client_address );
    int fd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
    if( fd < 0 )
    {
        printf( "errno is: %d\n", errno );
    }
    else
    {
        char buffer[ BUFFER_SIZE ];
        memset( buffer, '\0', BUFFER_SIZE );
        int data_read = 0;
        int read_index = 0;
        int checked_index = 0;
        int start_line = 0;
        CHECK_STATE checkstate = CHECK_STATE_REQUESTLINE;
        while( 1 )
        {
            data_read = recv( fd, buffer + read_index, BUFFER_SIZE - read_index, 0 );
            if ( data_read == -1 )
            {
                printf( "reading failed\n" );
                break;
            }
            else if ( data_read == 0 )
            {
                printf( "remote client has closed the connection\n" );
                break;
            }
    
            read_index += data_read;
            HTTP_CODE result = parse_content( buffer, checked_index, checkstate, read_index, start_line );
            if( result == NO_REQUEST )
            {
                continue;
            }
            else if( result == GET_REQUEST )
            {
                send( fd, szret[0], strlen( szret[0] ), 0 );
                break;
            }
            else
            {
                send( fd, szret[1], strlen( szret[1] ), 0 );
                break;
            }
        }
        close( fd );
    }
    
    close( listenfd );
    return 0;
}

只需注意一点的是通过while循环不断读取请求,根据读取数据进行请求行和消息头的寻找和分析。

下面是简单的,通过epoll_wait()调用的有限状态机(四种情况)和recator模型,将监听到的事件分为读写事件的简单服务端,代码如下:

//recator
#include <sys/socket.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <iostream>
#include <netinet/in.h>
#include <assert.h>

const int MAX_EVENT=1000;
const int MAX_BUFSIZE=1024;
const int clinetnum=99999;
int epollfd=0;
struct clinet_data;

typedef int(*REPLAY_LETTER)(int,clinet_data*);
void epollset(int,int);
void clintset(clinet_data*,int);

struct clinet_data{
    int fd;
    char bufread[MAX_BUFSIZE];
    char bufwrite[MAX_BUFSIZE];
    int readend;
    int writeend;
    REPLAY_LETTER readevent;
    REPLAY_LETTER writevent;
    REPLAY_LETTER closeevent;
    clinet_data():fd(0),readend(0),writeend(0){};
};

clinet_data* dataset=new clinet_data[clinetnum];

 void closeevent(int sockfd){
  close(sockfd);
  std::cout<<"close sockfd :"<<sockfd<<std::endl;
}
void epoll_event_change(int sockfd,int mode){
    switch (mode){
    
      case 1:
      {
        epoll_ctl(epollfd,EPOLL_CTL_DEL,sockfd,NULL);
        break;
      }
      case 2:
      {
        epoll_event ee;
        ee.data.fd=sockfd;
        ee.events=EPOLLOUT;
        epoll_ctl(sockfd,EPOLL_CTL_MOD,sockfd,&ee);
       break;
      }
      case 3:
      {
          epoll_event ee;
        ee.data.fd=sockfd;
        ee.events=EPOLLIN;
        epoll_ctl(sockfd,EPOLL_CTL_MOD,sockfd,&ee);  
        break;
      }
       case 4:
       {
                struct sockaddr_in addr;
                socklen_t size=sizeof(addr);
                int connect=accept(sockfd,(struct sockaddr*)&addr,&size);
                epollset(epollfd,connect);
                clintset(dataset,connect);
                break;
       }
      default:
      break;
    }
}
int readevent(int sockfd,clinet_data* dataset){
    //size<MAX_BUFFSIZE,事件改为写 然后发送回去

    int size=recv(sockfd,dataset[sockfd].bufread,MAX_BUFSIZE-1,0);
    dataset[sockfd].readend=size;
    dataset[sockfd].writeend=0;
      if(size==0){//对面关闭
        memset(dataset[sockfd].bufread,'\0',MAX_BUFSIZE);
        std::cout<<"nodata,over"<<std::endl;
        closeevent(sockfd);
        dataset[sockfd].fd=0;
        epoll_event_change(sockfd,1);
        return 0;
    }else{
    std::cout<<"recv data  :"<<dataset[sockfd].bufread<<std::endl;
    memcpy(dataset[sockfd].bufwrite,dataset[sockfd].bufread,size);
    memset(dataset[sockfd].bufread,'\0',MAX_BUFSIZE);
    epoll_event_change(sockfd,2);
    }
    return size;
}

int writeevent(int sockfd,clinet_data* dataset){
    dataset[sockfd].writeend=dataset[sockfd].readend;
    dataset[sockfd].readend=0;
    int count=send(sockfd,dataset[sockfd].bufwrite,dataset[sockfd].writeend,0);
    epoll_event_change(sockfd,3);
    return count;
}

void  clintset(clinet_data* con,int fd){
    con[fd].readevent=readevent;
    con[fd].writevent=writeevent;
    con[fd].fd=fd;
}

int setsocknoblock(int fd){
int old=fcntl(fd,F_GETFL);
int newfd=old|O_NONBLOCK;
fcntl(old,F_SETFL,newfd);
return old;
}

void epollset(int epollfd,int fd){
    epoll_event even;
    even.data.fd=fd;
    even.events=EPOLLET|EPOLLIN;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&even);
    setsocknoblock(fd);
}

int main(int argc,char* argv[]){

    if(argc<2){
        std::cout<<"not enough"<<std::endl;
        exit(1);
    }
    int ret=0;
     struct sockaddr_in address;
      epoll_event events[MAX_EVENT]; 
    memset(&address,0,sizeof(address));
    address.sin_family=AF_INET;
    address.sin_addr.s_addr=inet_addr("127.0.0.1");
    address.sin_port=htons(atoi(argv[1]));
    int severfd=socket(PF_INET,SOCK_STREAM,0);
    assert(severfd>=0);

    epollfd=epoll_create(5);
    epollset(epollfd,severfd);
    
    ret=bind(severfd,(struct sockaddr*)&address,sizeof(address));
    assert(ret>=0);

    ret=listen(severfd,5);
    assert(ret>=0);
    clinet_data* dataset=new clinet_data[clinetnum];
    while(1){
      int numm=epoll_wait(epollfd,events,MAX_EVENT-1,5*1000);
      if(numm==0){
        std::cout<<"timeout"<<std::endl;
      }else if(numm<0){
      std::cout<<"epoll_wait error"<<std::endl;
      exit(1);
      }else{
        for(int i=0;i<numm;i++){
            int fd=events[i].data.fd;
            if(events[i].events&EPOLLIN){
                if(fd==severfd){
                epoll_event_change(fd,4);
                }else{
                     readevent(fd,dataset);
                }
            }
            else if(events[i].events&EPOLLOUT){
                writeevent(fd,dataset);
            }
        }
      }
}
delete [] dataset;
close(severfd);
return 0;
}

结合客户端代码(输入Q后,客户端断开自行测试:(作者已经测试过了,没有问题)。

#include<iostream>
#include<sys/socket.h>
#include<string.h>
#include<unistd.h>
#include<arpa/inet.h>
const int a=1024;
using namespace std;
int main(int argc,char* argv[]){
    char message[a];
struct sockaddr_in times;
if(argc!=3){
    cout<<"error()"<<endl;
    exit(1);
}
int cilsock;
int str_len;
cilsock=socket(PF_INET,SOCK_STREAM,0);
if(cilsock==-1){
    cout<<"socket()error"<<endl;
}
memset(&times,0,sizeof(times));
times.sin_family=AF_INET;
times.sin_addr.s_addr=inet_addr(argv[1]);
times.sin_port=htons(atoi(argv[2]));
if(connect(cilsock,(struct sockaddr *)&times,sizeof(times))==-1){
cout<<"connext()error()"<<endl;
}else{
puts("connect........");
}
 
while(1){
    fputs("input message",stdout);
    fgets(message,a,stdin);
 
    if(!strcmp(message,"q/n")||!strcmp(message,"Q/n"))
    break;
 
    write(cilsock,message,strlen(message));
    str_len=read(cilsock,message,a-1);
    message[str_len]=0;
    cout<<message<<endl;
}
close(cilsock);
return 0;
 
}