Test the performance of the I/O queue, using typical producer consumer test. The test should examine the effect of using multiple threads on the performance.
#include "test.h"
#include <pjlib.h>
#include <pj/compat/high_precision.h>
 
#if INCLUDE_IOQUEUE_PERF_TEST
 
#ifdef _MSC_VER
#   pragma warning ( disable: 4204)     
#endif
 
#define THIS_FILE       "ioq_perf"
#define TRACE_(expr)
 
 
static unsigned last_error_counter;
 
#define LIMIT_TRANSFER  0
 
#define IS_ERROR_SILENCED(e)    ((e)==PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
 
typedef struct test_item
{
    const char          *type_name;
                         client_fd;
                        *client_key;
                         send_op;
    int                  has_pending_send;
    char                *outgoing_buffer;
    char                *incoming_buffer;
                         bytes_recv;
} test_item;
 
{
    int data_is_available = 1;
 
    
 
    do {
        if (thread_quit_flag)
            return;
 
        if (bytes_read < 0) {
 
            if (rc != last_error) {
                
 
                
                if (!IS_ERROR_SILENCED(rc)) {
                    PJ_LOG(3,(THIS_FILE,
"...error: read error, bytes_read=%d (%s)",
 
                              bytes_read, errmsg));
                              ".....additional info: type=%s, total read=%u, "
                              "total sent=%u",
                              item->type_name, item->bytes_recv,
                              item->bytes_sent));
                }
            } else {
                last_error_counter++;
            }
            bytes_read = 0;
 
        } else if (bytes_read == 0) {
            PJ_LOG(3,(THIS_FILE, 
"...socket has closed!"));
 
        }
 
        item->bytes_recv += bytes_read;
    
        
        if (LIMIT_TRANSFER && item->bytes_recv>item->buffer_size*LIMIT_TRANSFER)
            thread_quit_flag = 1;
 
        bytes_read = item->buffer_size;
                              item->incoming_buffer, &bytes_read, 0 );
 
            data_is_available = 1;
            data_is_available = 0;
        } else {
            data_is_available = 0;
            if (rc != last_error) {
                last_error = rc;
                app_perror("...error: read error(1)", rc);
            } else {
                last_error_counter++;
            }
        }
 
        if (!item->has_pending_send) {
                                 item->outgoing_buffer, &sent, 0);
                app_perror("...error: write error", rc);
                item->bytes_sent += sent;
            }
 
        }
 
    } while (data_is_available);
}
 
{
    
    
 
    if (thread_quit_flag)
        return;
 
    if (bytes_sent <= 0) {
        if (!IS_ERROR_SILENCED(-bytes_sent)) {
                          "...error: sending stopped. bytes_sent=%d",
                         -bytes_sent));
        }
        item->has_pending_send = 0;
    } 
    else if (!item->has_pending_send) {
 
        item->bytes_sent += bytes_sent;
        bytes_sent = item->buffer_size;
                              item->outgoing_buffer, &bytes_sent, 0);
            app_perror("...error: write error", rc);
            item->bytes_sent += bytes_sent;
        }
 
    }
}
 
struct thread_arg
{
    int           id;
    unsigned      loop_cnt,
                  err_cnt,
                  event_cnt;
};
 
static int worker_thread(void *p)
{
    struct thread_arg *arg = (struct thread_arg*) p;
    int rc;
 
    while (!thread_quit_flag) {
 
        ++arg->loop_cnt;
        
        if (rc < 0) {
                       "...error in pj_ioqueue_poll() in thread %d "
                       "after %d loop: %s [pj_status_t=%d]", 
                       arg->id, arg->loop_cnt, errmsg, -rc));
            
            ++arg->err_cnt;
        } else if (rc > 0) {
            ++arg->event_cnt;
        }
    }
    return 0;
}
 
                        int sock_type, const char *type_name,
                        unsigned thread_cnt, unsigned sockpair_cnt,
{
    enum { MSEC_DURATION = 5000 };
    test_item *items;
    struct thread_arg *args;
    pj_size_t total_elapsed_usec, total_received;
 
    pj_highprec_t bandwidth;
    unsigned i;
 
    TRACE_((THIS_FILE, "    starting test.."));
 
 
    thread_quit_flag = 0;
 
    if (!pool)
        return -10;
 
    items = (test_item*) 
pj_pool_calloc(pool, sockpair_cnt, 
sizeof(test_item));
 
    TRACE_((THIS_FILE, "     creating ioqueue.."));
        app_perror("...error: unable to create ioqueue", rc);
        return -15;
    }
 
    
    for (i=0; i<sockpair_cnt; ++i) {
 
        items[i].type_name = type_name;
        items[i].ioqueue = ioqueue;
        items[i].buffer_size = buffer_size;
        items[i].outgoing_buffer = (
char*) 
pj_pool_alloc(pool, buffer_size);
        items[i].incoming_buffer = (
char*) 
pj_pool_alloc(pool, buffer_size);
        items[i].bytes_recv = items[i].bytes_sent = 0;
 
        
 
        
 
        
        TRACE_((THIS_FILE, "      calling socketpair.."));
                            &items[i].server_fd, &items[i].client_fd);
            app_perror("...error: unable to create socket pair", rc);
            return -20;
        }
 
        
        TRACE_((THIS_FILE, "      register(1).."));
                                      items[i].server_fd,
                                      &items[i], &ioqueue_callback,
                                      &items[i].server_key);
            app_perror("...error: registering server socket to ioqueue", rc);
            return -60;
        }
 
        
        TRACE_((THIS_FILE, "      register(2).."));
                                      items[i].client_fd,
                                      &items[i],  &ioqueue_callback,
                                      &items[i].client_key);
            app_perror("...error: registering server socket to ioqueue", rc);
            return -70;
        }
 
        
        TRACE_((THIS_FILE, "      pj_ioqueue_recv.."));
        bytes = items[i].buffer_size;
                             items[i].incoming_buffer, &bytes,
                             0);
            app_perror("...error: pj_ioqueue_recv", rc);
            return -73;
        }
 
        
        TRACE_((THIS_FILE, "      pj_ioqueue_write.."));
        bytes = items[i].buffer_size;
                             items[i].outgoing_buffer, &bytes, 0);
            app_perror("...error: pj_ioqueue_write", rc);
            return -76;
            items[i].bytes_sent += bytes;
        }
 
    }
 
    
                                               sizeof(struct thread_arg));
    for (i=0; i<thread_cnt; ++i) {
        struct thread_arg *arg = &args[i];
        arg->id = i;
        arg->ioqueue = ioqueue;
 
                               &worker_thread, 
                               arg, 
                               PJ_THREAD_SUSPENDED, &thread[i] );
            app_perror("...error: unable to create thread", rc);
            return -80;
        }
    }
 
    
        return -90;
 
    
    TRACE_((THIS_FILE, "     resuming all threads.."));
    for (i=0; i<thread_cnt; ++i) {
        if (rc != 0)
            return -100;
    }
 
    
    TRACE_((THIS_FILE, "     wait for few seconds.."));
    do {
 
        
 
        if (thread_quit_flag) {
            TRACE_((THIS_FILE, "      transfer limit reached.."));
            break;
        }
 
            TRACE_((THIS_FILE, "      time limit reached.."));
            break;
        }
 
    } while (1);
 
    
    TRACE_((THIS_FILE, "     terminating all threads.."));
    thread_quit_flag = 1;
 
    for (i=0; i<thread_cnt; ++i) {
        TRACE_((THIS_FILE, "      join thread %d..", i));
    }
 
    
 
    
    TRACE_((THIS_FILE, "     closing all sockets.."));
    for (i=0; i<sockpair_cnt; ++i) {
    }
 
    
    for (i=0; i<thread_cnt; ++i) {
    }
 
    
    TRACE_((THIS_FILE, "     destroying ioqueue.."));
 
    
    total_received = 0;
    for (i=0; i<sockpair_cnt; ++i) {
        total_received += items[i].bytes_recv;
    }
 
    
    bandwidth = (pj_highprec_t)total_received;
    pj_highprec_mul(bandwidth, 1000);
    pj_highprec_div(bandwidth, total_elapsed_usec);
    
 
    if (display_report) {
        PJ_LOG(3,(THIS_FILE, 
"  %s %d threads, %d pairs", type_name,
 
                  thread_cnt, sockpair_cnt));
        PJ_LOG(3,(THIS_FILE, 
"  Elapsed  : %u msec", total_elapsed_usec/1000));
 
        PJ_LOG(3,(THIS_FILE, 
"  Bandwidth: %d KB/s", *p_bandwidth));
 
        PJ_LOG(3,(THIS_FILE, 
"  Threads statistics:"));
 
        PJ_LOG(3,(THIS_FILE, 
"    ============================="));
 
        PJ_LOG(3,(THIS_FILE, 
"    Thread  Loops  Events  Errors"));
 
        PJ_LOG(3,(THIS_FILE, 
"    ============================="));
 
        for (i=0; i<thread_cnt; ++i) {
            struct thread_arg *arg = &args[i];
            PJ_LOG(3,(THIS_FILE, 
" %6d  %6d  %6d  %6d",
 
                      arg->id, arg->loop_cnt, arg->event_cnt, arg->err_cnt));
        }
        PJ_LOG(3,(THIS_FILE, 
"    ============================="));
 
        PJ_LOG(3,(THIS_FILE, 
"  Socket-pair statistics:"));
 
        PJ_LOG(3,(THIS_FILE, 
"    ==================================="));
 
        PJ_LOG(3,(THIS_FILE, 
"    Pair     Sent     Recv    Pct total"));
 
        PJ_LOG(3,(THIS_FILE, 
"    ==================================="));
 
        for (i=0; i<sockpair_cnt; ++i) {
            test_item *item = &items[i];
            PJ_LOG(3,(THIS_FILE, 
"    %4d  %5.1f MB  %5.1f MB    %5.1f%%",
 
                      i, item->bytes_sent/1000000.0,
                      item->bytes_recv/1000000.0,
                      item->bytes_recv*100.0/total_received));
        }
    } else {
        PJ_LOG(3,(THIS_FILE, 
"   %.4s    %2d        %2d       %8d KB/s",
 
                  type_name, thread_cnt, sockpair_cnt,
                  *p_bandwidth));
    }
 
    
 
    TRACE_((THIS_FILE, "    done.."));
    return 0;
}
 
{
    enum { BUF_SIZE = 512 };
    int i, rc;
    struct {
        int         type;
        const char *type_name;
        int         thread_cnt;
        int         sockpair_cnt;
    } test_param[] = 
    {
 
    };
    int best_index = 0;
 
    PJ_LOG(3,(THIS_FILE, 
"   Testing with concurency=%d, epoll_flags=0x%x",
 
    PJ_LOG(3,(THIS_FILE, 
"   ======================================="));
 
    PJ_LOG(3,(THIS_FILE, 
"   Type  Threads  Skt.Pairs      Bandwidth"));
 
    PJ_LOG(3,(THIS_FILE, 
"   ======================================="));
 
 
    best_bandwidth = 0;
    for (i=0; i<(int)(sizeof(test_param)/sizeof(test_param[0])); ++i) {
 
        rc = perform_test(cfg,
                          test_param[i].type, 
                          test_param[i].type_name,
                          test_param[i].thread_cnt, 
                          test_param[i].sockpair_cnt, 
                          BUF_SIZE, 
                          &bandwidth);
        if (rc != 0)
            return rc;
 
        if (bandwidth > best_bandwidth)
            best_bandwidth = bandwidth, best_index = i;
 
        
    }
 
              "   Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
              test_param[best_index].type_name,
              test_param[best_index].thread_cnt,
              test_param[best_index].sockpair_cnt,
              best_bandwidth));
    PJ_LOG(3,(THIS_FILE, 
"   (Note: packet size=%d, total errors=%u)", 
 
                         BUF_SIZE, last_error_counter));
    return 0;
}
 
int ioqueue_perf_test(void)
{
#if PJ_HAS_LINUX_EPOLL
        0,
#else
#endif
    };
    int i, rc;
 
    
 
        PJ_LOG(3,(THIS_FILE, 
" Detailed perf (concurrency=%d, epoll_flags=0x%x):",
 
        rc = perform_test(&cfg,
                          "udp",
                          8,
                          8,
                          512,
                          &bandwidth);
        if (rc != 0)
            return rc;
    }
 
    
    PJ_LOG(3,(THIS_FILE, 
" Detailed perf (concurrency=%d, epoll_flags=0x%x):",
 
    rc = perform_test(&cfg,
                      "udp",
                      8,
                      8,
                      512,
                      &bandwidth);
    if (rc != 0)
        return rc;
 
    
        int concur;
        for (concur=0; concur<2; ++concur) {
            rc = ioqueue_perf_test_imp(&cfg);
            if (rc != 0)
                return rc;
        }
    }
 
    return 0;
}
 
#else
int dummy_uiq_perf_test;
#endif  
 
 
long pj_ssize_t
Definition: types.h:64
int pj_bool_t
Definition: types.h:71
struct pj_ioqueue_t pj_ioqueue_t
Definition: types.h:210
long pj_sock_t
Definition: types.h:263
struct pj_ioqueue_key_t pj_ioqueue_key_t
Definition: types.h:216
size_t pj_size_t
Definition: types.h:58
int pj_status_t
Definition: types.h:68
struct pj_thread_t pj_thread_t
Definition: types.h:236
#define PJ_ARRAY_SIZE(a)
Definition: types.h:281
unsigned int pj_uint32_t
Definition: types.h:43
@ PJ_SUCCESS
Definition: types.h:93
@ PJ_TRUE
Definition: types.h:96
@ PJ_FALSE
Definition: types.h:99
pj_ioqueue_epoll_flag
Definition: ioqueue.h:337
void pj_ioqueue_op_key_init(pj_ioqueue_op_key_t *op_key, pj_size_t size)
pj_status_t pj_ioqueue_create2(pj_pool_t *pool, pj_size_t max_fd, const pj_ioqueue_cfg *cfg, pj_ioqueue_t **ioqueue)
pj_status_t pj_ioqueue_destroy(pj_ioqueue_t *ioque)
pj_status_t pj_ioqueue_recv(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, void *buffer, pj_ssize_t *length, pj_uint32_t flags)
void * pj_ioqueue_get_user_data(pj_ioqueue_key_t *key)
pj_status_t pj_ioqueue_unregister(pj_ioqueue_key_t *key)
void pj_ioqueue_cfg_default(pj_ioqueue_cfg *cfg)
pj_status_t pj_ioqueue_send(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, const void *data, pj_ssize_t *length, pj_uint32_t flags)
int pj_ioqueue_poll(pj_ioqueue_t *ioque, const pj_time_val *timeout)
pj_status_t pj_ioqueue_register_sock(pj_pool_t *pool, pj_ioqueue_t *ioque, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **key)
const char * pj_ioqueue_name(void)
@ PJ_IOQUEUE_EPOLL_AUTO
Definition: ioqueue.h:351
@ PJ_IOQUEUE_EPOLL_ONESHOT
Definition: ioqueue.h:344
@ PJ_IOQUEUE_EPOLL_EXCLUSIVE
Definition: ioqueue.h:340
#define PJ_LOG(level, arg)
Definition: log.h:106
void * pj_pool_alloc(pj_pool_t *pool, pj_size_t size)
pj_pool_t * pj_pool_create(pj_pool_factory *factory, const char *name, pj_size_t initial_size, pj_size_t increment_size, pj_pool_callback *callback)
void * pj_pool_calloc(pj_pool_t *pool, pj_size_t count, pj_size_t elem)
void pj_pool_release(pj_pool_t *pool)
char * pj_create_random_string(char *str, pj_size_t length)
#define pj_AF_INET()
Definition: sock.h:113
#define pj_SOCK_DGRAM()
Definition: sock.h:162
#define pj_SOCK_STREAM()
Definition: sock.h:160
pj_status_t pj_thread_resume(pj_thread_t *thread)
pj_status_t pj_thread_destroy(pj_thread_t *thread)
pj_status_t pj_thread_join(pj_thread_t *thread)
pj_status_t pj_thread_create(pj_pool_t *pool, const char *thread_name, pj_thread_proc *proc, void *arg, pj_size_t stack_size, unsigned flags, pj_thread_t **thread)
pj_status_t pj_thread_sleep(unsigned msec)
pj_status_t pj_get_timestamp(pj_timestamp *ts)
pj_uint32_t pj_elapsed_usec(const pj_timestamp *start, const pj_timestamp *stop)
#define PJ_THREAD_DEFAULT_STACK_SIZE
Definition: config.h:607
#define PJ_ERR_MSG_SIZE
Definition: errno.h:84
pj_str_t pj_strerror(pj_status_t statcode, char *buf, pj_size_t bufsize)
#define PJ_PERROR(level, arg)
Definition: errno.h:175
#define PJ_EPENDING
Definition: errno.h:322
Definition: udp_echo_srv_ioqueue.c:27
Definition: ioqueue.h:219
void(* on_write_complete)(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent)
Definition: ioqueue.h:246
void(* on_read_complete)(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read)
Definition: ioqueue.h:231
Definition: ioqueue.h:362
unsigned epoll_flags
Definition: ioqueue.h:370
pj_bool_t default_concurrency
Definition: ioqueue.h:379
Definition: ioqueue.h:208