Home --> Documentations --> PJLIB Reference

Test: I/O Queue Performance

Test the performance of the I/O queue, using typical producer consumer test. The test should examine the effect of using multiple threads on the performance.

This file is pjlib-test/ioq_perf.c

/* $Id: ioq_perf.c 5170 2015-08-25 08:45:46Z nanang $ */
/*
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "test.h"
#include <pjlib.h>
#include <pj/compat/high_precision.h>
#if INCLUDE_IOQUEUE_PERF_TEST
#ifdef _MSC_VER
# pragma warning ( disable: 4204) // non-constant aggregate initializer
#endif
#define THIS_FILE "ioq_perf"
//#define TRACE_(expr) PJ_LOG(3,expr)
#define TRACE_(expr)
static pj_bool_t thread_quit_flag;
static pj_status_t last_error;
static unsigned last_error_counter;
/* Descriptor for each producer/consumer pair. */
typedef struct test_item
{
pj_sock_t server_fd,
client_fd;
pj_ioqueue_t *ioqueue;
pj_ioqueue_key_t *server_key,
*client_key;
send_op;
int has_pending_send;
pj_size_t buffer_size;
char *outgoing_buffer;
char *incoming_buffer;
pj_size_t bytes_sent,
bytes_recv;
} test_item;
/* Callback when data has been read.
* Increment item->bytes_recv and ready to read the next data.
*/
static void on_read_complete(pj_ioqueue_key_t *key,
pj_ssize_t bytes_read)
{
test_item *item = (test_item*)pj_ioqueue_get_user_data(key);
int data_is_available = 1;
//TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read));
do {
if (thread_quit_flag)
return;
if (bytes_read < 0) {
char errmsg[PJ_ERR_MSG_SIZE];
rc = (pj_status_t)-bytes_read;
if (rc != last_error) {
//last_error = rc;
pj_strerror(rc, errmsg, sizeof(errmsg));
PJ_LOG(3,(THIS_FILE,"...error: read error, bytes_read=%d (%s)",
bytes_read, errmsg));
PJ_LOG(3,(THIS_FILE,
".....additional info: total read=%u, total sent=%u",
item->bytes_recv, item->bytes_sent));
} else {
last_error_counter++;
}
bytes_read = 0;
} else if (bytes_read == 0) {
PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
}
item->bytes_recv += bytes_read;
/* To assure that the test quits, even if main thread
* doesn't have time to run.
*/
if (item->bytes_recv > item->buffer_size * 10000)
thread_quit_flag = 1;
bytes_read = item->buffer_size;
rc = pj_ioqueue_recv( key, op_key,
item->incoming_buffer, &bytes_read, 0 );
if (rc == PJ_SUCCESS) {
data_is_available = 1;
} else if (rc == PJ_EPENDING) {
data_is_available = 0;
} else {
data_is_available = 0;
if (rc != last_error) {
last_error = rc;
app_perror("...error: read error(1)", rc);
} else {
last_error_counter++;
}
}
if (!item->has_pending_send) {
pj_ssize_t sent = item->buffer_size;
rc = pj_ioqueue_send(item->client_key, &item->send_op,
item->outgoing_buffer, &sent, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: write error", rc);
}
item->has_pending_send = (rc==PJ_EPENDING);
}
} while (data_is_available);
}
/* Callback when data has been written.
* Increment item->bytes_sent and write the next data.
*/
static void on_write_complete(pj_ioqueue_key_t *key,
pj_ssize_t bytes_sent)
{
test_item *item = (test_item*) pj_ioqueue_get_user_data(key);
//TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent));
if (thread_quit_flag)
return;
item->has_pending_send = 0;
item->bytes_sent += bytes_sent;
if (bytes_sent <= 0) {
PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d",
bytes_sent));
}
else {
bytes_sent = item->buffer_size;
rc = pj_ioqueue_send( item->client_key, op_key,
item->outgoing_buffer, &bytes_sent, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: write error", rc);
}
item->has_pending_send = (rc==PJ_EPENDING);
}
}
struct thread_arg
{
int id;
pj_ioqueue_t *ioqueue;
unsigned counter;
};
/* The worker thread. */
static int worker_thread(void *p)
{
struct thread_arg *arg = (struct thread_arg*) p;
const pj_time_val timeout = {0, 100};
int rc;
while (!thread_quit_flag) {
++arg->counter;
rc = pj_ioqueue_poll(arg->ioqueue, &timeout);
//TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc));
if (rc < 0) {
char errmsg[PJ_ERR_MSG_SIZE];
pj_strerror(-rc, errmsg, sizeof(errmsg));
PJ_LOG(3, (THIS_FILE,
"...error in pj_ioqueue_poll() in thread %d "
"after %d loop: %s [pj_status_t=%d]",
arg->id, arg->counter, errmsg, -rc));
//return -1;
}
}
return 0;
}
/* Calculate the bandwidth for the specific test configuration.
* The test is simple:
* - create sockpair_cnt number of producer-consumer socket pair.
* - create thread_cnt number of worker threads.
* - each producer will send buffer_size bytes data as fast and
* as soon as it can.
* - each consumer will read buffer_size bytes of data as fast
* as it could.
* - measure the total bytes received by all consumers during a
* period of time.
*/
static int perform_test(pj_bool_t allow_concur,
int sock_type, const char *type_name,
unsigned thread_cnt, unsigned sockpair_cnt,
pj_size_t buffer_size,
pj_size_t *p_bandwidth)
{
enum { MSEC_DURATION = 5000 };
pj_pool_t *pool;
test_item *items;
pj_thread_t **thread;
pj_ioqueue_t *ioqueue;
pj_ioqueue_callback ioqueue_callback;
pj_uint32_t total_elapsed_usec, total_received;
pj_highprec_t bandwidth;
pj_timestamp start, stop;
unsigned i;
TRACE_((THIS_FILE, " starting test.."));
ioqueue_callback.on_read_complete = &on_read_complete;
ioqueue_callback.on_write_complete = &on_write_complete;
thread_quit_flag = 0;
pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);
if (!pool)
return -10;
items = (test_item*) pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));
thread = (pj_thread_t**)
pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
TRACE_((THIS_FILE, " creating ioqueue.."));
rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
if (rc != PJ_SUCCESS) {
app_perror("...error: unable to create ioqueue", rc);
return -15;
}
rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
if (rc != PJ_SUCCESS) {
app_perror("...error: pj_ioqueue_set_default_concurrency()", rc);
return -16;
}
/* Initialize each producer-consumer pair. */
for (i=0; i<sockpair_cnt; ++i) {
pj_ssize_t bytes;
items[i].ioqueue = ioqueue;
items[i].buffer_size = buffer_size;
items[i].outgoing_buffer = (char*) pj_pool_alloc(pool, buffer_size);
items[i].incoming_buffer = (char*) pj_pool_alloc(pool, buffer_size);
items[i].bytes_recv = items[i].bytes_sent = 0;
/* randomize outgoing buffer. */
pj_create_random_string(items[i].outgoing_buffer, buffer_size);
/* Create socket pair. */
TRACE_((THIS_FILE, " calling socketpair.."));
rc = app_socketpair(pj_AF_INET(), sock_type, 0,
&items[i].server_fd, &items[i].client_fd);
if (rc != PJ_SUCCESS) {
app_perror("...error: unable to create socket pair", rc);
return -20;
}
/* Register server socket to ioqueue. */
TRACE_((THIS_FILE, " register(1).."));
rc = pj_ioqueue_register_sock(pool, ioqueue,
items[i].server_fd,
&items[i], &ioqueue_callback,
&items[i].server_key);
if (rc != PJ_SUCCESS) {
app_perror("...error: registering server socket to ioqueue", rc);
return -60;
}
/* Register client socket to ioqueue. */
TRACE_((THIS_FILE, " register(2).."));
rc = pj_ioqueue_register_sock(pool, ioqueue,
items[i].client_fd,
&items[i], &ioqueue_callback,
&items[i].client_key);
if (rc != PJ_SUCCESS) {
app_perror("...error: registering server socket to ioqueue", rc);
return -70;
}
/* Start reading. */
TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
bytes = items[i].buffer_size;
rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
items[i].incoming_buffer, &bytes,
0);
if (rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_recv", rc);
return -73;
}
/* Start writing. */
TRACE_((THIS_FILE, " pj_ioqueue_write.."));
bytes = items[i].buffer_size;
rc = pj_ioqueue_send(items[i].client_key, &items[i].send_op,
items[i].outgoing_buffer, &bytes, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_write", rc);
return -76;
}
items[i].has_pending_send = (rc==PJ_EPENDING);
}
/* Create the threads. */
for (i=0; i<thread_cnt; ++i) {
struct thread_arg *arg;
arg = (struct thread_arg*) pj_pool_zalloc(pool, sizeof(*arg));
arg->id = i;
arg->ioqueue = ioqueue;
arg->counter = 0;
rc = pj_thread_create( pool, NULL,
&worker_thread,
arg,
PJ_THREAD_SUSPENDED, &thread[i] );
if (rc != PJ_SUCCESS) {
app_perror("...error: unable to create thread", rc);
return -80;
}
}
/* Mark start time. */
rc = pj_get_timestamp(&start);
if (rc != PJ_SUCCESS)
return -90;
/* Start the thread. */
TRACE_((THIS_FILE, " resuming all threads.."));
for (i=0; i<thread_cnt; ++i) {
rc = pj_thread_resume(thread[i]);
if (rc != 0)
return -100;
}
/* Wait for MSEC_DURATION seconds.
* This should be as simple as pj_thread_sleep(MSEC_DURATION) actually,
* but unfortunately it doesn't work when system doesn't employ
* timeslicing for threads.
*/
TRACE_((THIS_FILE, " wait for few seconds.."));
do {
/* Mark end time. */
rc = pj_get_timestamp(&stop);
if (thread_quit_flag) {
TRACE_((THIS_FILE, " transfer limit reached.."));
break;
}
if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) {
TRACE_((THIS_FILE, " time limit reached.."));
break;
}
} while (1);
/* Terminate all threads. */
TRACE_((THIS_FILE, " terminating all threads.."));
thread_quit_flag = 1;
for (i=0; i<thread_cnt; ++i) {
TRACE_((THIS_FILE, " join thread %d..", i));
pj_thread_join(thread[i]);
}
/* Close all sockets. */
TRACE_((THIS_FILE, " closing all sockets.."));
for (i=0; i<sockpair_cnt; ++i) {
pj_ioqueue_unregister(items[i].server_key);
pj_ioqueue_unregister(items[i].client_key);
}
/* Destroy threads */
for (i=0; i<thread_cnt; ++i) {
pj_thread_destroy(thread[i]);
}
/* Destroy ioqueue. */
TRACE_((THIS_FILE, " destroying ioqueue.."));
/* Calculate actual time in usec. */
total_elapsed_usec = pj_elapsed_usec(&start, &stop);
/* Calculate total bytes received. */
total_received = 0;
for (i=0; i<sockpair_cnt; ++i) {
total_received = (pj_uint32_t)items[i].bytes_recv;
}
/* bandwidth = total_received*1000/total_elapsed_usec */
bandwidth = total_received;
pj_highprec_mul(bandwidth, 1000);
pj_highprec_div(bandwidth, total_elapsed_usec);
*p_bandwidth = (pj_uint32_t)bandwidth;
PJ_LOG(3,(THIS_FILE, " %.4s %2d %2d %8d KB/s",
type_name, thread_cnt, sockpair_cnt,
*p_bandwidth));
/* Done. */
TRACE_((THIS_FILE, " done.."));
return 0;
}
static int ioqueue_perf_test_imp(pj_bool_t allow_concur)
{
enum { BUF_SIZE = 512 };
int i, rc;
struct {
int type;
const char *type_name;
int thread_cnt;
int sockpair_cnt;
} test_param[] =
{
{ pj_SOCK_DGRAM(), "udp", 1, 1},
{ pj_SOCK_DGRAM(), "udp", 1, 2},
{ pj_SOCK_DGRAM(), "udp", 1, 4},
{ pj_SOCK_DGRAM(), "udp", 1, 8},
{ pj_SOCK_DGRAM(), "udp", 2, 1},
{ pj_SOCK_DGRAM(), "udp", 2, 2},
{ pj_SOCK_DGRAM(), "udp", 2, 4},
{ pj_SOCK_DGRAM(), "udp", 2, 8},
{ pj_SOCK_DGRAM(), "udp", 4, 1},
{ pj_SOCK_DGRAM(), "udp", 4, 2},
{ pj_SOCK_DGRAM(), "udp", 4, 4},
{ pj_SOCK_DGRAM(), "udp", 4, 8},
{ pj_SOCK_DGRAM(), "udp", 4, 16},
{ pj_SOCK_STREAM(), "tcp", 1, 1},
{ pj_SOCK_STREAM(), "tcp", 1, 2},
{ pj_SOCK_STREAM(), "tcp", 1, 4},
{ pj_SOCK_STREAM(), "tcp", 1, 8},
{ pj_SOCK_STREAM(), "tcp", 2, 1},
{ pj_SOCK_STREAM(), "tcp", 2, 2},
{ pj_SOCK_STREAM(), "tcp", 2, 4},
{ pj_SOCK_STREAM(), "tcp", 2, 8},
{ pj_SOCK_STREAM(), "tcp", 4, 1},
{ pj_SOCK_STREAM(), "tcp", 4, 2},
{ pj_SOCK_STREAM(), "tcp", 4, 4},
{ pj_SOCK_STREAM(), "tcp", 4, 8},
{ pj_SOCK_STREAM(), "tcp", 4, 16},
/*
{ pj_SOCK_DGRAM(), "udp", 32, 1},
{ pj_SOCK_DGRAM(), "udp", 32, 1},
{ pj_SOCK_DGRAM(), "udp", 32, 1},
{ pj_SOCK_DGRAM(), "udp", 32, 1},
{ pj_SOCK_DGRAM(), "udp", 1, 32},
{ pj_SOCK_DGRAM(), "udp", 1, 32},
{ pj_SOCK_DGRAM(), "udp", 1, 32},
{ pj_SOCK_DGRAM(), "udp", 1, 32},
{ pj_SOCK_STREAM(), "tcp", 32, 1},
{ pj_SOCK_STREAM(), "tcp", 32, 1},
{ pj_SOCK_STREAM(), "tcp", 32, 1},
{ pj_SOCK_STREAM(), "tcp", 32, 1},
{ pj_SOCK_STREAM(), "tcp", 1, 32},
{ pj_SOCK_STREAM(), "tcp", 1, 32},
{ pj_SOCK_STREAM(), "tcp", 1, 32},
{ pj_SOCK_STREAM(), "tcp", 1, 32},
*/
};
pj_size_t best_bandwidth;
int best_index = 0;
PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name()));
PJ_LOG(3,(THIS_FILE, " Testing with concurency=%d", allow_concur));
PJ_LOG(3,(THIS_FILE, " ======================================="));
PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Bandwidth"));
PJ_LOG(3,(THIS_FILE, " ======================================="));
best_bandwidth = 0;
for (i=0; i<(int)(sizeof(test_param)/sizeof(test_param[0])); ++i) {
pj_size_t bandwidth;
rc = perform_test(allow_concur,
test_param[i].type,
test_param[i].type_name,
test_param[i].thread_cnt,
test_param[i].sockpair_cnt,
BUF_SIZE,
&bandwidth);
if (rc != 0)
return rc;
if (bandwidth > best_bandwidth)
best_bandwidth = bandwidth, best_index = i;
/* Give it a rest before next test, to allow system to close the
* sockets properly.
*/
}
PJ_LOG(3,(THIS_FILE,
" Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
test_param[best_index].type_name,
test_param[best_index].thread_cnt,
test_param[best_index].sockpair_cnt,
best_bandwidth));
PJ_LOG(3,(THIS_FILE, " (Note: packet size=%d, total errors=%u)",
BUF_SIZE, last_error_counter));
return 0;
}
/*
* main test entry.
*/
int ioqueue_perf_test(void)
{
int rc;
rc = ioqueue_perf_test_imp(PJ_TRUE);
if (rc != 0)
return rc;
rc = ioqueue_perf_test_imp(PJ_FALSE);
if (rc != 0)
return rc;
return 0;
}
#else
/* To prevent warning about "translation unit is empty"
* when this test is disabled.
*/
int dummy_uiq_perf_test;
#endif /* INCLUDE_IOQUEUE_PERF_TEST */

 


PJLIB Open Source, high performance, small footprint, and very very portable framework
Copyright (C) 2006-2009 Teluu Inc.