0
点赞
收藏
分享

微信扫一扫

在C程序中实现线程间事件消峰处理的简单示例 - 超出事件放回队列

在一个C程序中,除了main之外,有2个子线程,线程A不断产生新的事件,通过线程间消息队列将事件不断发送给线程B。线程B有一个while循环,不断从线程间消息队列中获取线程A发来的事件,并逐个处理每个事件。现在的问题是,如何在线程B的while循环中,实现事件的消峰处理? 不要采用pthread cond等处理。本问题关心的是如何在线程B的while循环中实现消峰处理。比如,可以在while循环体中,每处理一个事件时,都获取以下当前的unix时间秒数,在同一秒中最多只处理100个事件,如果发现unix时间秒数已经变化,则重新计数。请基于此思路解答。

在这个场景中,我们需要在线程B的循环中实现消峰处理,以控制每秒最多处理100个事件。具体实现步骤如下:

  1. 定义变量:在循环开始前定义几个变量来记录当前时间、处理的事件数量以及每秒处理的限制。
  2. 获取当前时间:在每次迭代开始时,获取当前的Unix时间戳。
  3. 检查时间变化:与上一次记录的时间进行比较,如果时间变化,重置事件计数。
  4. 处理事件:在处理每个事件时,先检查当前已处理的事件数量,如果已达上限,则暂时停止处理,直到新的时间段开始。

以下是示例代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>

// 假设事件的定义
typedef struct {
    int id; // 事件ID
} Event;

typedef struct {
    Event* events; // 事件数组
    int front; // 队头
    int rear; // 队尾
    int maxSize; // 最大容量
    pthread_mutex_t mutex; // 互斥锁
} MessageQueue;

// 队列相关函数
void initQueue(MessageQueue* queue, int size) {
    queue->events = (Event*)malloc(size * sizeof(Event));
    queue->front = 0;
    queue->rear = 0;
    queue->maxSize = size;
    pthread_mutex_init(&queue->mutex, NULL);
}

int enqueue(MessageQueue* queue, Event event) {
    pthread_mutex_lock(&queue->mutex);
    if ((queue->rear + 1) % queue->maxSize == queue->front) {
        pthread_mutex_unlock(&queue->mutex);
        return -1; // 队列满
    }
    queue->events[queue->rear] = event;
    queue->rear = (queue->rear + 1) % queue->maxSize;
    pthread_mutex_unlock(&queue->mutex);
    return 0; // 成功入队
}

int dequeue(MessageQueue* queue, Event* event) {
    pthread_mutex_lock(&queue->mutex);
    if (queue->front == queue->rear) {
        pthread_mutex_unlock(&queue->mutex);
        return -1; // 队列空
    }
    *event = queue->events[queue->front];
    queue->front = (queue->front + 1) % queue->maxSize;
    pthread_mutex_unlock(&queue->mutex);
    return 0; // 成功出队
}

// 线程B的处理函数
void* threadB(void* arg) {
    MessageQueue* queue = (MessageQueue*)arg;
    int maxEventsPerSecond = 100;
    int eventCount = 0;
    time_t lastTime = time(NULL);

    while (1) {
        Event event;
        // 尝试从队列中获取事件
        if (dequeue(queue, &event) == 0) {
            time_t currentTime = time(NULL);
            // 检查时间是否变化
            if (currentTime != lastTime) {
                lastTime = currentTime; // 更新最后处理时间
                eventCount = 0; // 重置计数
            }
            // 处理事件
            if (eventCount < maxEventsPerSecond) {
                printf("处理事件 ID: %d\n", event.id);
                eventCount++;
            } else {
                // 超过限制,暂时停止处理
                usleep(100000); // 暂停100毫秒后重试
            }
        } else {
            // 如果队列为空,短暂休眠,避免忙等待
            usleep(10000); // 暂停10毫秒
        }
    }
    return NULL;
}

int main() {
    MessageQueue queue;
    initQueue(&queue, 1000); // 初始化消息队列

    pthread_t threadB_id;
    pthread_create(&threadB_id, NULL, threadB, (void*)&queue);

    // 线程A产生事件的示例
    for (int i = 0; i < 1000; i++) {
        Event event = { i };
        enqueue(&queue, event);
        usleep(50000); // 每50毫秒产生一个事件
    }

    pthread_join(threadB_id, NULL);
    free(queue.events);
    return 0;
}

代码说明:

  1. MessageQueue:定义了一个简单的线程安全消息队列,包含入队和出队操作。
  2. threadB:在这个线程中,使用了一个while循环来处理事件。它会检查当前时间与上一次处理的时间,若时间改变,则重置计数器。
  3. 事件处理限制:在处理事件时,检查当前事件计数,若达到限制则使用 usleep 暂停一段时间,以防止处理过多事件。

上述回答的代码不正确。因为期待的关键是,如果在同一秒中内,可以最多处理100个事件。假如同一秒中的事件数超过100个时,则在该秒钟内,不能再处理更多事件。但是,在紧接着的下一秒内,是可以处理多出来的事件的。你的代码没有准确实现这个关键期待。请重新生成代码。

我们需要确保在同一秒内最多处理100个事件,并且在下一秒可以继续处理超出的事件。下面是经过修正的代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>

#define MAX_EVENTS_PER_SECOND 100

// 假设事件的定义
typedef struct {
    int id; // 事件ID
} Event;

typedef struct {
    Event* events; // 事件数组
    int front; // 队头
    int rear; // 队尾
    int maxSize; // 最大容量
    pthread_mutex_t mutex; // 互斥锁
} MessageQueue;

// 队列相关函数
void initQueue(MessageQueue* queue, int size) {
    queue->events = (Event*)malloc(size * sizeof(Event));
    queue->front = 0;
    queue->rear = 0;
    queue->maxSize = size;
    pthread_mutex_init(&queue->mutex, NULL);
}

int enqueue(MessageQueue* queue, Event event) {
    pthread_mutex_lock(&queue->mutex);
    if ((queue->rear + 1) % queue->maxSize == queue->front) {
        pthread_mutex_unlock(&queue->mutex);
        return -1; // 队列满
    }
    queue->events[queue->rear] = event;
    queue->rear = (queue->rear + 1) % queue->maxSize;
    pthread_mutex_unlock(&queue->mutex);
    return 0; // 成功入队
}

int dequeue(MessageQueue* queue, Event* event) {
    pthread_mutex_lock(&queue->mutex);
    if (queue->front == queue->rear) {
        pthread_mutex_unlock(&queue->mutex);
        return -1; // 队列空
    }
    *event = queue->events[queue->front];
    queue->front = (queue->front + 1) % queue->maxSize;
    pthread_mutex_unlock(&queue->mutex);
    return 0; // 成功出队
}

// 线程B的处理函数
void* threadB(void* arg) {
    MessageQueue* queue = (MessageQueue*)arg;
    int eventCount = 0;
    time_t lastTime = time(NULL);

    while (1) {
        Event event;
        // 尝试从队列中获取事件
        if (dequeue(queue, &event) == 0) {
            time_t currentTime = time(NULL);
            
            // 检查时间是否变化
            if (currentTime != lastTime) {
                lastTime = currentTime; // 更新最后处理时间
                eventCount = 0; // 重置计数
            }
            
            // 处理事件
            if (eventCount < MAX_EVENTS_PER_SECOND) {
                printf("处理事件 ID: %d\n", event.id);
                eventCount++;
            } else {
                // 超过限制,不处理当前事件,继续下一轮
                enqueue(queue, event); // 将事件放回队列
            }
        } else {
            // 如果队列为空,短暂休眠,避免忙等待
            usleep(10000); // 暂停10毫秒
        }
    }
    return NULL;
}

int main() {
    MessageQueue queue;
    initQueue(&queue, 1000); // 初始化消息队列

    pthread_t threadB_id;
    pthread_create(&threadB_id, NULL, threadB, (void*)&queue);

    // 线程A产生事件的示例
    for (int i = 0; i < 1000; i++) {
        Event event = { i };
        enqueue(&queue, event);
        usleep(50000); // 每50毫秒产生一个事件
    }

    pthread_join(threadB_id, NULL);
    free(queue.events);
    return 0;
}

代码说明:

  1. MessageQueue:和之前一样,实现了线程安全的消息队列。
  2. threadB:在这个线程中,循环获取事件并处理。如果在同一秒内已处理100个事件,则将当前事件放回队列,等待下一秒再次处理。
  3. 事件处理限制:每次从队列获取事件时,首先检查当前秒数和已处理事件数量,确保在同一秒内不会处理超过100个事件。

我们需要确保在同一秒内最多处理100个事件,并且在下一秒可以继续处理超出的事件。下面是经过修正的代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>

#define MAX_EVENTS_PER_SECOND 100

// 假设事件的定义
typedef struct {
    int id; // 事件ID
} Event;

typedef struct {
    Event* events; // 事件数组
    int front; // 队头
    int rear; // 队尾
    int maxSize; // 最大容量
    pthread_mutex_t mutex; // 互斥锁
} MessageQueue;

// 队列相关函数
void initQueue(MessageQueue* queue, int size) {
    queue->events = (Event*)malloc(size * sizeof(Event));
    queue->front = 0;
    queue->rear = 0;
    queue->maxSize = size;
    pthread_mutex_init(&queue->mutex, NULL);
}

int enqueue(MessageQueue* queue, Event event) {
    pthread_mutex_lock(&queue->mutex);
    if ((queue->rear + 1) % queue->maxSize == queue->front) {
        pthread_mutex_unlock(&queue->mutex);
        return -1; // 队列满
    }
    queue->events[queue->rear] = event;
    queue->rear = (queue->rear + 1) % queue->maxSize;
    pthread_mutex_unlock(&queue->mutex);
    return 0; // 成功入队
}

int dequeue(MessageQueue* queue, Event* event) {
    pthread_mutex_lock(&queue->mutex);
    if (queue->front == queue->rear) {
        pthread_mutex_unlock(&queue->mutex);
        return -1; // 队列空
    }
    *event = queue->events[queue->front];
    queue->front = (queue->front + 1) % queue->maxSize;
    pthread_mutex_unlock(&queue->mutex);
    return 0; // 成功出队
}

// 线程B的处理函数
void* threadB(void* arg) {
    MessageQueue* queue = (MessageQueue*)arg;
    int eventCount = 0;
    time_t lastTime = time(NULL);

    while (1) {
        Event event;
        // 尝试从队列中获取事件
        if (dequeue(queue, &event) == 0) {
            time_t currentTime = time(NULL);
            
            // 检查时间是否变化
            if (currentTime != lastTime) {
                lastTime = currentTime; // 更新最后处理时间
                eventCount = 0; // 重置计数
            }
            
            // 处理事件
            if (eventCount < MAX_EVENTS_PER_SECOND) {
                printf("处理事件 ID: %d\n", event.id);
                eventCount++;
            } else {
                // 超过限制,不处理当前事件,继续下一轮
                enqueue(queue, event); // 将事件放回队列
            }
        } else {
            // 如果队列为空,短暂休眠,避免忙等待
            usleep(10000); // 暂停10毫秒
        }
    }
    return NULL;
}

int main() {
    MessageQueue queue;
    initQueue(&queue, 1000); // 初始化消息队列

    pthread_t threadB_id;
    pthread_create(&threadB_id, NULL, threadB, (void*)&queue);

    // 线程A产生事件的示例
    for (int i = 0; i < 1000; i++) {
        Event event = { i };
        enqueue(&queue, event);
        usleep(50000); // 每50毫秒产生一个事件
    }

    pthread_join(threadB_id, NULL);
    free(queue.events);
    return 0;
}

代码说明:

  1. MessageQueue:和之前一样,实现了线程安全的消息队列。
  2. threadB:在这个线程中,循环获取事件并处理。如果在同一秒内已处理100个事件,则将当前事件放回队列,等待下一秒再次处理。
  3. 事件处理限制:每次从队列获取事件时,首先检查当前秒数和已处理事件数量,确保在同一秒内不会处理超过100个事件。


举报

相关推荐

0 条评论