0
点赞
收藏
分享

微信扫一扫

远程文件夹同步,涉及 linux c socket inotify等知识

不会弹吉他的二郎腿 2022-04-22 阅读 69
linux

是否有时候因为没有U盘拷贝文件而烦恼?如果远程的两个文件夹能像绑定挂载一样多好!
若在一个文件夹里删除,添加,远程的文件夹也会保持同步!
我花了两三天在Linux系统上用C语言实现了这个功能,目前只在Ubuntu18系统上测试过,其他Linux系统应该也没问题。
在这里贴出客户端和服务器端的主要源代码

Gitee gitee
GitHub github

目前只支持一个文件夹里面的常规文件,不支持文件夹嵌套更新

第一次写开源代码,喜欢的给我点个Star
客户端

//
// Created by xia on 2022/4/13.
//

#include <sys/sendfile.h>
#include "client.h"


int createClientSocket(const char *ip,int port)
{
    int sock;
    int reuse = 1;

    struct sockaddr_in server_address;
    memset(&server_address,0,sizeof(struct sockaddr_in));
    server_address.sin_family = AF_INET;
    server_address.sin_port = htons(port);
    if(inet_pton(AF_INET,ip,&server_address.sin_addr) <= 0)
    {
        LOG_ERROR("ip: %s is not correct!",ip);
        return -1;
    }
    if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
        LOG_ERROR("socket error!");
        return -1;
    }
    /* Address can be reused instantly after program exits */
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof reuse);

    /* Bind socket to server address */
    if(connect(sock,(struct sockaddr*) &server_address, sizeof(server_address)) < 0){
        LOG_ERROR("Cannot connect socket to address");
        return -1;
    }
    return sock;
}

/*
 * when a file was created or changed,this function will be used.
 * */
void handleCloseWrite(struct inotify_event *event,int client_fd)
{
    static char buf[BUF_SIZE];
    if(event->len > 0)
    {
        int fd = open(event->name,O_RDONLY);
        if(fd == -1)
        {
            LOG_WARN("Could not open file: %s",event->name);
            return;
        }
        off_t size = lseek(fd,0,SEEK_END);
        if(size == (off_t)-1)
        {
            LOG_WARN("lseek failed: %s",event->name);
            return;
        }
        lseek(fd,0,SEEK_SET);
        snprintf(buf,BUF_SIZE,"%ld",(long int)size);
        ssize_t curSend,numSend = 0;
        int val = 1;
        setsockopt(client_fd,IPPROTO_TCP,TCP_CORK,&val,sizeof(val));
        clock_t start = clock();
        if(sendInfo(client_fd,STR_CMD[UPDATE],event->name,buf))
        {
            while (numSend < size)
            {
                curSend = sendfile(client_fd,fd,NULL,size-numSend);
                if(curSend == -1)
                {
                    if(lseek(fd,0,SEEK_CUR)==size)
                        break;
                    else
                        continue;
                }
                numSend += curSend;
            }
        }
        val = 0;
        setsockopt(client_fd,IPPROTO_TCP,TCP_CORK,&val,sizeof(val));
        if(numSend != size)
            LOG_WARN("file: %s has %ld bytes,but send %ld bytes",event->name,size,numSend);
        clock_t end = clock();
        double seconds = (double )(end-start)/CLOCKS_PER_SEC;
        double speed = (double)numSend/(1024 * 1024 * seconds);
        LOG_INFO("end send file: %s,use %.2f seconds,speed %.2f Mb/s",event->name,seconds,speed);
        close(fd);
    }
}
/*
 * when a file was deleted,use this function
 * */
void handleDelete(struct inotify_event *event,int client_fd)
{
    if(event->len > 0)
        sendInfo(client_fd,STR_CMD[DELETE],event->name,NULL);
}
/*
 * when the watched dir was deleted,should notify the pair don't send anything
 * */
void handleDeleteSelf(struct inotify_event *event,int client_fd)
{
    sendInfo(client_fd,STR_CMD[DELETE_SELF],NULL,NULL);
}
/*
 * when the file was renamed,this function was activate
 * */
void handleRename(struct inotify_event *event,int client_fd)
{
    static uint32_t cookie;
    static char old[BUF_SIZE];
    if(event->len > 0)
    {
        if(event->mask & IN_MOVED_TO)
        {
            if(event->cookie == cookie)
            {
                sendInfo(client_fd,STR_CMD[RENAME],old,event->name);
                LOG_INFO("rename file from %s to %s",old,event->name);
            }
        }
        else if(event->mask & IN_MOVED_FROM)
        {
            cookie = event->cookie;
            strcpy(old,event->name);
        }
    }
}

/*
 * handle an inotify event
 * */
void disparityEvent(struct inotify_event *event,int client_fd)
{
    if(CouldSynchronous)
    {
        if(event->mask & IN_CLOSE_WRITE)
            handleCloseWrite(event,client_fd);
        if(event->mask & IN_DELETE)
            handleDelete(event,client_fd);
        if(event->mask & IN_DELETE_SELF)
            handleDeleteSelf(event,client_fd);
        if(event->mask & IN_MOVE)
            handleRename(event,client_fd);
    }
    else
    {
        LOG_WARN("since the pair is not online,file change will not synchronous");
    }
}
/*
 * add a dir to Inotify List
 * */
int addInotifyDirWatch(int inotifyFd,const char *dir,uint32_t mask)
{
    static struct stat stat_buf;
    if(stat(dir,&stat_buf) == -1 || !S_ISDIR(stat_buf.st_mode))
    {
        LOG_ERROR("file: %s is not exists or not a dir",dir);
        return -1;
    }
    return inotify_add_watch(inotifyFd,dir,mask);
}
/*
 * read inotify event from kernel
 * */
void handleInotifyEvent(int inotifyFd,int client_fd)
{
    static struct inotify_event *event;
    static char buf[BUF_LEN];
    ssize_t numRead;
    while ((numRead = read(inotifyFd,buf,BUF_LEN)) > 0)
    {
        for(char *p = buf;p<buf + numRead;)
        {
            event = (struct inotify_event *)p;
            disparityEvent(event,client_fd);
            p += sizeof(struct inotify_event) + event->len;
        }
        memset(buf,0,BUF_LEN);
    }
}
/*
 * when a header is not valid,we should drop the remaining data
 * */
void dropData(int fd)
{
    static char buf[BUF_SIZE];
    ssize_t numRead;
    if(isConnected(fd))
    {
        while (TRUE)
        {
            numRead = read(fd,buf,BUF_SIZE);
            if(numRead == -1)
            {
                if(errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else
                    err_exit("read");
            }
        }
    }
}
/*
 * we receive an update head,we should create and update the file
 * */
void handleTcpUpdate(int client_fd,const char *filename,const char* size)
{
    if(filename == NULL || size == NULL)
        return;

    int fd = open(filename,O_WRONLY|O_TRUNC|O_CREAT,0664);
    if(fd == -1)
    {
        LOG_WARN("Could not open file: %s,so suspend the update",filename);
        return;
    }
    char *endPtr;
    ssize_t totalSize = strtol(size,&endPtr,10);
    if(totalSize < 0)
    {
        LOG_WARN("the header size is %ld,so suspend the update",totalSize);
        return;
    }
    ssize_t curWrite,numWrite=0;
    ssize_t curRead;
    static char buf[BUF_SIZE];
    clock_t start = clock();
    while (numWrite < totalSize)
    {
        curRead = read(client_fd,buf,BUF_SIZE);
        if(curRead == -1)
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK)
                continue;
            else
                err_exit("read");
        }
        if((curWrite = write(fd,buf,curRead)) == -1)
        {
            LOG_WARN("write data to disk failed");
            break;
        }
        else
            numWrite+=curWrite;
    }
    clock_t end = clock();
    close(fd);
    if(numWrite != totalSize)
        LOG_WARN("file: %s has %ld bytes,but received %ld bytes",filename,totalSize,numWrite);
    double seconds = (double )(end-start)/CLOCKS_PER_SEC;
    double speed = (double)numWrite/(1024 * 1024 * seconds);
    LOG_INFO("end handleTcpUpdate file: %s,use %.2f seconds,speed %.2f Mb/s",filename,seconds,speed);
}
/*
 * Delete file
 * */
void handleTcpDelete(const char *filename)
{
    if(filename == NULL)
    {
        LOG_WARN("delete filename is NULL");
        return;
    }
    if(remove(filename) == -1)
        LOG_WARN("remove file %s failed",filename);
}
/*
 * Rename a file
 * */
void handleTcpRename(const char *old,const char *new)
{
    if(old==NULL || new == NULL)
    {
        LOG_WARN("old filename or new filename is empty");
        return;
    }
    if(rename(old,new) == -1)
        LOG_WARN("rename file %s to %s failed",old,new);
}

/*
 * receive Tcp data,and handle the event
 * */
void handleTcpEvent(int client_fd)
{
    char buf[BUF_SIZE];
    ssize_t numRead;
    numRead = readLine(client_fd,buf,BUF_SIZE);
    if(numRead == 0)
    {
        LOG_WARN("readLine return zero");
        return;
    }
    if(numRead == BUF_SIZE - 1) //drop the too long data
    {
        LOG_WARN("header is too long,the data will");
        dropData(client_fd);
        return;
    }
    char *cmd,*arg1,*arg2;
    cmd = strtok(buf,"|");

    if(cmd == NULL)
    {
        LOG_WARN("command is NULL,please check the header format");
        dropData(client_fd);
        return;
    }
    if(strcmp(cmd,STR_CMD[MESSAGE]) != 0)
    {
        arg1 = strtok(NULL,"|");
        arg2 = strtok(NULL,"|\n");
    }
    if(strcmp(cmd,STR_CMD[DELETE_SELF]) == 0)
    {
        LOG_WARN("the pair dir was deleted,turn off synchronous");
        CouldSynchronous = FALSE;
    }
    else if(strcmp(cmd,STR_CMD[UNPAIRED]) == 0)
    {
        LOG_WARN("the pair was quit,turn off synchronous");
        CouldSynchronous = FALSE;
    }
    else if(strcmp(cmd,STR_CMD[PAIRED]) == 0)
    {
        CouldSynchronous = TRUE;
        LOG_WARN("the pair is online,turn on synchronous");
    }
    else if(strcmp(cmd,STR_CMD[MESSAGE]) == 0)
    {
        arg1 = strtok(NULL,"|\n");
        LOG_INFO("Receive Message-> %s", (arg1==NULL)?"EMPTY MESSAGE":arg1);
    }
    else if (strcmp(cmd,STR_CMD[RENAME]) == 0)
        handleTcpRename(arg1,arg2);
    else if (strcmp(cmd,STR_CMD[DELETE]) == 0)
        handleTcpDelete(arg1);
    else if(strcmp(cmd,STR_CMD[UPDATE]) == 0)
        handleTcpUpdate(client_fd,arg1,arg2);
    else
        LOG_WARN("UNKNOWN COMMAND");
    dropData(client_fd);
}
int main(int argc,char *argv[])
{
    if(argc!=6|| strcmp(argv[1],"--help") == 0)
        usageErr("%s ip port username password dirname\n",argv[0]);

    int inotifyFd,client_fd,epoll_fd,dir_fd,port;
    port = (int)strtol(argv[2],NULL,10);

    inotifyFd = inotify_init();
    if(inotifyFd == -1)
        errExit("create inotify failed");
    if(setNonBlock(inotifyFd) == -1)
        errExit("setNonBlock inotifyFd");

    dir_fd = addInotifyDirWatch(inotifyFd,argv[5],IN_ALL_EVENTS);
    if( dir_fd == -1)
        errExit("failed to add %s",argv[5]);

    if(chdir(argv[5]) == -1)
        errExit("chdir");

    epoll_fd = epoll_create(2);
    if(epoll_fd < 0)
        errExit("epoll_create");

    client_fd = createClientSocket(argv[1], port);
    if(client_fd < 0)
        errExit("createClientSocket");
    if(setNonBlock(client_fd) == -1)
        errExit("setNonBlock client_fd");

    if(addEpollInLetEvent(epoll_fd,inotifyFd) == -1)
        errExit("addEpollInLetEvent inotifyFd");
    if(addEpollInLetEvent(epoll_fd,client_fd) == -1)
        errExit("addEpollInLetEvent client_fd");

    int eventRead,i;
    struct epoll_event events[2];
    sendInfo(client_fd,STR_CMD[CONNECTING],argv[3],argv[4]);
    while (TRUE)
    {
        eventRead = epoll_wait(epoll_fd,events,2,-1);
        if(eventRead == -1)
        {
            if(errno == EINTR)
                continue;
            else
                errExit("epoll_wait");
        }
        for (i = 0; i < eventRead; ++i) {
            if(events[i].events & EPOLLIN)
            {
                if(events[i].data.fd == inotifyFd)
                    handleInotifyEvent(inotifyFd,client_fd);
                else if(events[i].data.fd == client_fd)
                {
                    if(inotify_rm_watch(inotifyFd,dir_fd) < 0)
                        errExit("inotify_rm_watch failed");
                    handleTcpEvent(client_fd);
                    dir_fd = addInotifyDirWatch(inotifyFd,argv[5],IN_ALL_EVENTS);
                    if(dir_fd < 0)
                        errExit("addInotifyDirWatch failed,probably the dir was deleted");
                }
            }
            if(events[i].events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP))
            {
                close(events[i].data.fd);
                close(epoll_fd);
                close(client_fd);
                errExit("connection was closed");
            }
        }
    }
}

服务器端
//
// Created by xia on 2022/4/15.
//
#define _GNU_SOURCE
#include “server.h”
#include “sys/wait.h”
#include “signal.h”
#include “fcntl.h”

/*

  • clear an entry
  • */
    void clearEntry(int pos)
    {
    entries[pos].status = OFFLINE;
    entries[pos].pid = -1;
    safeCloseSock(entries[pos].first_fd);
    entries[pos].first_fd = -1;
    safeCloseSock(entries[pos].second_fd);
    entries[pos].second_fd = -1;
    entries[pos].username[0] = ‘\0’;
    entries[pos].password[0] = ‘\0’;
    }

void init()
{
LOG_INFO(“init”);
for(int i=0;i<MAX_PAIRS;i++)
clearEntry(i);
}

/*

  • find entry by name,if don’t find,return NULL
  • */
    int findEntryByName(const char *name)
    {
    for(int i=0;i<MAX_PAIRS;i++)
    {
    if(strcmp(entries[i].username,name) == 0)
    return i;
    }
    return -1;
    }

int findEntryByPid(pid_t pid)
{
for(int i=0;i<MAX_PAIRS;i++)
{
if(entries[i].pid == pid)
return i;
}
return -1;
}
/*

  • find unused entry,except RUNNING status
  • */
    int findUnusedEntry()
    {
    for(int i=0;i<MAX_PAIRS;i++)
    {
    if(entries[i].status == OFFLINE)
    return i;
    }
    for(int i=0;i<MAX_PAIRS;i++)
    {
    if(entries[i].status == SINGLE)
    return i;
    }
    for(int i=0;i<MAX_PAIRS;i++)
    {
    if(entries[i].status == COUPLE)
    return i;
    }
    return -1;
    }

void updateOneEntry(int pos)
{
int first = isConnected(entries[pos].first_fd);
int second = isConnected(entries[pos].second_fd);
if(first+ second == 2 && entries[pos].status == RUNNING)
return;
else if(first + second == 1)
{
if(second == 1)
{
entries[pos].first_fd = entries[pos].second_fd;
entries[pos].second_fd = -1;
}
entries[pos].status = SINGLE;
entries[pos].pid = -1;
//sendInfo(entries[pos].first_fd,STR_CMD[UNPAIRED],NULL,NULL);
LOG_INFO(“entry %d exist one established connection,name: %s”,pos,entries[pos].username);
return;
}
clearEntry(pos);
}
/*

  • update entry,if there are one established connection
  • update the status to single
  • /
    void updateAllEntry()
    {
    LOG_INFO(“start updateEntry”);
    for(int i=0;i<MAX_PAIRS;i++)
    updateOneEntry(i);
    LOG_INFO(“end updateEntry”);
    }
    /
  • insert an entry by name and password
  • return -1 if insert fail
  • return the pos of entry
  • check the status of entry to determine the next step
  • */
    int insertEntry(const char *name,const char password,int fd)
    {
    if(name == NULL || password == NULL)
    return -1;
    updateAllEntry();
    int pos = findEntryByName(name);
    if(pos==-1)
    {
    LOG_INFO(“Couldn’t find name: %s,try to find unused position”,name);
    pos = findUnusedEntry();
    if(pos == -1)
    {
    LOG_WARN(“Couldn’t find an unused position to name: %s”,name);
    return -1;
    }
    clearEntry(pos);
    strcpy(entries[pos].username,name);
    strcpy(entries[pos].password,password);
    entries[pos].status = SINGLE;
    entries[pos].first_fd = fd;
    entries[pos].second_fd = -1;
    return pos;
    }
    else if(entries[pos].status == SINGLE)
    {
    if(strcmp(entries[pos].password,password) == 0 )
    {
    entries[pos].status = COUPLE;
    entries[pos].second_fd = fd;
    return pos;
    } else
    LOG_WARN(“password for name: %s is not correct”,name);
    }
    return -1;
    }
    /
  • clear the entry if subprocess exit
  • if not found the entry or the entry is using,return -1 else return 0
  • /
    void clearEntryById(pid_t pid)
    {
    int pos = findEntryByPid(pid);
    if(pos == -1)
    {
    LOG_WARN(“Couldn’t find entry for pid: %d”,pid);
    return;
    }
    updateOneEntry(pos);
    }
    /
  • when a subprocess exit,update the entry status
  • */
    void sigChildHandler(int sig)
    {
    pid_t pid;
    while ((pid = waitpid(-1,NULL,WNOHANG)) > 0)
    clearEntryById(pid);
    }

int createServerSocket(const char *ip,int port)
{
int sock;
int reuse = 1;

struct sockaddr_in server_address;
memset(&server_address,0,sizeof(struct sockaddr_in));
server_address.sin_family = AF_INET;
server_address.sin_port = htons(port);
if(inet_pton(AF_INET,ip,&server_address.sin_addr) <= 0)
{
    LOG_ERROR("ip: %s is not correct!",ip);
    return -1;
}

if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
    LOG_ERROR("socket error!");
    return -1;
}
/* Address can be reused instantly after program exits */
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof reuse);
/* Bind socket to server address */
if(bind(sock,(struct sockaddr*) &server_address, sizeof(server_address)) < 0){
    LOG_ERROR("Cannot connect socket to address");
    return -1;
}
listen(sock,5);
return sock;

}
/*

  • transmit the data between the pairs
  • /
    void transmit(int in_fd,int out_fd,int pipe_out_fd,int pipe_in_fd)
    {
    static ssize_t total = 0;
    ssize_t totalReceived,totalSend;
    totalReceived = splice(in_fd, NULL, pipe_out_fd, NULL,
    1<<16, SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
    if (totalReceived == -1) {
    if(errno != EAGAIN && errno != EWOULDBLOCK)
    err_exit(“splice”);
    }
    totalSend = splice(pipe_in_fd, NULL, out_fd, NULL,
    1<<16, SPLICE_F_MOVE | SPLICE_F_NONBLOCK | SPLICE_F_MORE);
    if (totalSend == -1) {
    if(errno != EAGAIN && errno != EWOULDBLOCK)
    err_exit(“splice”);
    }
    if(totalReceived != totalSend)
    LOG_WARN(“Server received %ld bytes,Send %ld bytes”,totalReceived,totalSend);
    else
    LOG_INFO(“Server received %ld bytes,Send %ld bytes”,totalReceived,totalSend);
    total += totalSend;
    LOG_INFO(“total is %f”,total/(1024
    1024.0));
    }

void handleEntry(int pos)
{
int first_fd=entries[pos].first_fd,second_fd = entries[pos].second_fd;
if(isConnected(first_fd) + isConnected(second_fd) !=2)
err_exit(“couple fd is not all open!”);
sendInfo(first_fd,STR_CMD[PAIRED],NULL,NULL);
sendInfo(second_fd,STR_CMD[PAIRED],NULL,NULL);
int epoll_fd;
epoll_fd = epoll_create(2);
if(epoll_fd == -1)
err_exit(“epoll_create”);

if(addEpollInLevelEvent(epoll_fd,first_fd) == -1)
    err_exit("epoll_ctl first_fd.");

if(addEpollInLevelEvent(epoll_fd,second_fd) == -1)
    err_exit("epoll_ctl second_fd.");
int pipe_fd[2];
if(pipe(pipe_fd) == -1)
    err_exit("piped failed");
int eventRead,i;
struct epoll_event events[2];
while (TRUE)
{
    eventRead = epoll_wait(epoll_fd,events,2,-1);
    if(eventRead == -1)
    {
        if(errno == EINTR)
            continue;
        else
            err_exit("epoll_wait");
    }
    for (i = 0; i < eventRead; ++i) {
        if(events[i].events & EPOLLIN)
        {
            if(events[i].data.fd == first_fd)
                transmit(first_fd,second_fd,pipe_fd[1],pipe_fd[0]);

            else if(events[i].data.fd == second_fd)
                transmit(second_fd,first_fd,pipe_fd[1],pipe_fd[0]);
        }
        if(events[i].events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP))
        {
            close(events[i].data.fd);
            errExit("connection was closed");
        }
    }
}

}

int main(int argc,char *argv[])
{
if(argc!=3 || strcmp(argv[1],“–help”) == 0)
usageErr(“%s ip port\n”,argv[0]);
int port = (int)strtol(argv[2],NULL,10);
signal(SIGCHLD,sigChildHandler);
init();
int server_fd = createServerSocket(argv[1],port);
if(server_fd == -1)
errExit(“create_server_socket”);
struct sockaddr_in client_address;
socklen_t socklen = sizeof(struct sockaddr_in);
int client_fd;
ssize_t numRead;
pid_t pid;
char *username,*password,command;
char buf[BUF_SIZE];
LOG_INFO(“start to receive connection”);
while (TRUE)
{
socklen = sizeof(struct sockaddr_in);
client_fd = accept(server_fd,(struct sockaddr
)&client_address,&socklen);
if(client_fd < 0)
errExit(“accept failed”);
if(inet_ntop(AF_INET,&client_address.sin_addr,buf,BUF_SIZE)!=NULL)
LOG_INFO(“Receive connection from (%s,%u)”,buf, ntohs(client_address.sin_port));
numRead = readLine(client_fd,buf,BUF_SIZE);
if(numRead == 0)
{
LOG_WARN(“readLine return zero”);
close(client_fd);
continue;
}
if(numRead == BUF_SIZE - 1) //drop the too long data
{
LOG_WARN(“header is too long,the data will”);
close(client_fd);
continue;
}

    command = strtok(buf,"|\n");
    if(strcmp(command,STR_CMD[CONNECTING]) != 0)
    {
        errMsg("Command is not right!");
        sendMsg(client_fd,"%s","Command is not right!");
        close(client_fd);
        continue;
    }
    username = strtok(NULL,"|\n");
    password = strtok(NULL,"|\n");
    int pos = insertEntry(username,password,client_fd);
    if( pos == -1)
    {
        sendMsg(client_fd,"%s","connect error,check the name and password");
        LOG_WARN("insert user: %s,password: %s failed",username,password);
        close(client_fd);
        continue;
    }
    LOG_INFO("user: %s,Status: %d",username,entries[pos].status);
    if(entries[pos].status == COUPLE)
    {
        pid = fork();
        if(pid < 0)
            errExit("fork");
        if(pid == 0)
        {
            close(server_fd);
            handleEntry(pos);
            _exit(0);
        }
        else
        {
            entries[pos].pid = pid;
            entries[pos].status = RUNNING;
            LOG_INFO("user: %s is Running,pid: %d",username,entries[pos].pid);
        }
    }
    else if(entries[pos].status == SINGLE)
    {
        sendMsg(client_fd,"Server: Connect success,waiting "
                                              "the pair to connect");
        sendInfo(client_fd,STR_CMD[UNPAIRED],NULL,NULL);
    }

}

}

举报

相关推荐

0 条评论