Thursday, November 8, 2012

Futex in Linux

Test Class for Shm

class Test
{
        public:
        int i_val[3];
        int64_t time;
        int seq;
};
 

Following is futex WAIT example using shm

include <linux/futex.h>
#include <sys/time.h>
#include <iostream>
#include <unistd.h>
#include <sys/syscall.h>

#include <Timer.h>
#include "ShmManager.h"
#include "Test.h"

//static long sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
//{
//      return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
//}

using namespace std;
using namespace NS_COOL;


int main(int argc, char** argv)
{
        google::InitGoogleLogging(argv[0]);

        Timer timer;
        Test * test;
        ShmManager shm_man;
        shm_man.Add("FUTEX_SHM",4096 ,false);
        shm_man.AddType("FUTEX_SHM","FUTEX_SHM_TYPE", &test);
        cout<<"Shm created"<<endl;


        int index = atoi(argv[1]);
        cout<<"test "<<test->i_val[index]<<endl;
        test->i_val[index] = 1;
        cout<<"test "<<test->i_val[index]<<endl;

        int count = 0;
        int64_t tot = 0;
        //int64_t next_seq = 1;

        int min = 1000;
        int max = 0;
        int diff = 0;

while(true)
{
//       cout<<"Waiting.."<<endl;
        //int ret = syscall(SYS_futex ,&test->i_val,FUTEX_WAIT,1,NULL,NULL,0);
        syscall(SYS_futex ,&test->i_val[index],FUTEX_WAIT,1,NULL,NULL,0);
        int64_t time  = timer.ClockGetTime().GetTime();
        /*int l_Errno = errno;
        cout<<"l_Errno="<<l_Errno<<" seq="<<test->seq<<" Time="<<time - test->time<<endl;
        if (l_Errno == ETIMEDOUT)
                cout<<"ETIMEDOUT ret="<<ret<<endl;
        if (l_Errno == EWOULDBLOCK)
                cout<<"EWOULDBLOCK ret="<<ret<<endl;
        if (l_Errno == EINTR)
                cout<<"EINTR ret="<<ret<<endl;
       

        if(next_seq != test->seq)
        {
                cout<<"Seq Missing"<<endl;
                next_seq = test->seq + 1;      
        }
        else
                ++next_seq;
        */

        ++count;
        diff = time - test->time;
        tot += diff;
        if(min > diff)
                min = diff;

        if(max < diff)
                max = diff;

        if(count % 100 == 0)
                cout<<"Avg = "<<double(tot)/double(count)<<" min="<<min<<" max="<<max<<endl;
}
        return 0;
}

Following is futex WAKE example using shm

#include <linux/futex.h>
#include <sys/time.h>
#include <iostream>
#include <unistd.h>
#include <sys/syscall.h>

#include <Timer.h>
#include "ShmManager.h"
#include "Test.h"

//static long sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
//{
//      return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
//}

using namespace std;
using namespace NS_COOL;


int main(int argc, char** argv)
{
        google::InitGoogleLogging(argv[0]);

        Timer timer;
        Test * test;
        ShmManager shm_man;
        shm_man.Add("FUTEX_SHM",1024 ,false);
        shm_man.AddType("FUTEX_SHM","FUTEX_SHM_TYPE", &test);
        cout<<"Shm created"<<endl;
        sleep (10);
        test->seq = 0;

        int count = 0;
        int64_t tot = 0;
while(true)
{
        ++test->seq;
        //cout<<"Waking.."<<endl;
        test->time = timer.ClockGetTime().GetTime();
        for(int i = 0 ; i < 3 ; ++i)
                syscall(SYS_futex ,&test->i_val[i],FUTEX_WAKE,1,NULL,NULL,0);
        /*int ret = syscall(SYS_futex ,&test->i_val,FUTEX_WAKE,5,NULL,NULL,0);
        cout<<"syscall time "<<timer.ClockGetTime().GetTime() - test->time<<endl;
        int l_Errno = errno;
        //cout<<"l_Errno="<<l_Errno<<endl;
        if (l_Errno == ETIMEDOUT)
                cout<<"ETIMEDOUT ret="<<ret<<endl;
        if (l_Errno == EWOULDBLOCK)
                cout<<"EWOULDBLOCK ret="<<ret<<endl;
        if (l_Errno == EINTR)
                cout<<"EINTR ret="<<ret<<endl;
        */
        ++count;
        tot += timer.ClockGetTime().GetTime() - test->time;
        if(count % 100 == 0)
                cout<<"Avg = "<<double(tot)/double(count)<<endl;

        usleep(5000);
}


        return 0;
}

How to Bind a Process to CPU in Linux

Linux Setting processor affinity for a certain task or process

When you are using SMP (Symmetric MultiProcessing) you might want to override the kernel's process scheduling and bind a certain process to a specific CPU(s).

But what is CPU affinity?

CPU affinity is nothing but a scheduler property that "bonds" a process to a given set of CPUs on the SMP system. The Linux scheduler will honor the given CPU affinity and the process will not run on any other CPUs. Note that the Linux scheduler also supports natural CPU affinity:
The scheduler attempts to keep processes on the same CPU as long as practical for performance reasons. Therefore, forcing a specific CPU affinity is useful only in certain applications. For example, application such as Oracle (ERP apps) use # of cpus per instance licensed. You can bound Oracle to specific CPU to avoid license problem. This is a really useful on large server having 4 or 8 CPUS

Setting processor affinity for a certain task or process using taskset command

taskset is used to set or retrieve the CPU affinity of a running process given its PID or to launch a new COMMAND with a given CPU affinity. However taskset is not installed by default. You need to install schedutils (Linux scheduler utilities) package.

Install schedutils

Debian Linux:
# apt-get install schedutils

Red Hat Enterprise Linux:
# up2date schedutils

OR
# rpm -ivh schedutils*

Under latest version of Debian / Ubuntu Linux taskset is installed by default using util-linux package.
The CPU affinity is represented as a bitmask, with the lowest order bit corresponding to the first logical CPU and the highest order bit corresponding to the last logical CPU. For example:
  • 0x00000001 is processor #0 (1st processor)
  • 0x00000003 is processors #0 and #1
  • 0x00000004 is processors #2 (3rd processor)
To set the processor affinity of process 13545 to processor #0 (1st processor) type following command:
# taskset 0x00000001 -p 13545

If you find a bitmask hard to use, then you can specify a numerical list of processors instead of a bitmask using -c flag:
# taskset -c 1 -p 13545
# taskset -c 3,4 -p 13545

Where,
  • -p : Operate on an existing PID and not launch a new task (default is to launch a new task)

Wednesday, November 7, 2012

DBUS

D-Bus is an inter-process communication (IPC) open-source system for software applications to communicate with one another. Heavily influenced by KDE 2–3's DCOP system, D-Bus has replaced DCOP in the KDE 4 release. An implementation of D-Bus supports most POSIX operating systems, and a port for Windows exists. It is used by Qt 4 and GNOME. In GNOME it has gradually replaced most parts of the earlier Bonobo mechanism.
Red Hat operates as the primary developer of D-Bus as part of the freedesktop.org project.

 Example

#define DBUS_API_SUBJECT_TO_CHANGE
#include <dbus/dbus.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

/**
 * Connect to the DBUS bus and send a broadcast signal
 */
void sendsignal(char* sigvalue)
{
   DBusMessage* msg;
   DBusMessageIter args;
   DBusConnection* conn;
   DBusError err;
   int ret;
   dbus_uint32_t serial = 0;

   printf("Sending signal with value %s\n", sigvalue);

   // initialise the error value
   dbus_error_init(&err);

   // connect to the DBUS system bus, and check for errors
   conn = dbus_bus_get(DBUS_BUS_SYSTEM, &err);
   if (dbus_error_is_set(&err)) {
      fprintf(stderr, "Connection Error (%s)\n", err.message);
      dbus_error_free(&err);
   }
   if (NULL == conn) {
      exit(1);
   }

   // register our name on the bus, and check for errors
   ret = dbus_bus_request_name(conn, "test.signal.source", DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
   if (dbus_error_is_set(&err)) {
      fprintf(stderr, "Name Error (%s)\n", err.message);
      dbus_error_free(&err);
   }
   if (DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER != ret) {
      exit(1);
   }

   // create a signal & check for errors
   msg = dbus_message_new_signal("/test/signal/Object", // object name of the signal
                                 "test.signal.Type", // interface name of the signal
                                 "Test"); // name of the signal
   if (NULL == msg)
   {
      fprintf(stderr, "Message Null\n");
      exit(1);
   }

   // append arguments onto signal
   dbus_message_iter_init_append(msg, &args);
   if (!dbus_message_iter_append_basic(&args, DBUS_TYPE_STRING, &sigvalue)) {
      fprintf(stderr, "Out Of Memory!\n");
      exit(1);
   }

   // send the message and flush the connection
   if (!dbus_connection_send(conn, msg, &serial)) {
      fprintf(stderr, "Out Of Memory!\n");
      exit(1);
   }
   dbus_connection_flush(conn);
  
   printf("Signal Sent\n");
  
   // free the message and close the connection
   dbus_message_unref(msg);
   dbus_connection_close(conn);
}

/**
 * Call a method on a remote object
 */
void query(char* param)
{
   DBusMessage* msg;
   DBusMessageIter args;
   DBusConnection* conn;
   DBusError err;
   DBusPendingCall* pending;
   int ret;
   bool stat;
   dbus_uint32_t level;

   printf("Calling remote method with %s\n", param);

   // initialiset the errors
   dbus_error_init(&err);

   // connect to the system bus and check for errors
   conn = dbus_bus_get(DBUS_BUS_SYSTEM, &err);
   if (dbus_error_is_set(&err)) {
      fprintf(stderr, "Connection Error (%s)\n", err.message);
      dbus_error_free(&err);
   }
   if (NULL == conn) {
      exit(1);
   }

   // request our name on the bus
   ret = dbus_bus_request_name(conn, "test.method.caller", DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
   if (dbus_error_is_set(&err)) {
      fprintf(stderr, "Name Error (%s)\n", err.message);
      dbus_error_free(&err);
   }
   if (DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER != ret) {
      exit(1);
   }

   // create a new method call and check for errors
   msg = dbus_message_new_method_call("test.method.server", // target for the method call
                                      "/test/method/Object", // object to call on
                                      "test.method.Type", // interface to call on
                                      "Method"); // method name
   if (NULL == msg) {
      fprintf(stderr, "Message Null\n");
      exit(1);
   }

   // append arguments
   dbus_message_iter_init_append(msg, &args);
   if (!dbus_message_iter_append_basic(&args, DBUS_TYPE_STRING, &param)) {
      fprintf(stderr, "Out Of Memory!\n");
      exit(1);
   }
  
   // send message and get a handle for a reply
   if (!dbus_connection_send_with_reply (conn, msg, &pending, -1)) { // -1 is default timeout
      fprintf(stderr, "Out Of Memory!\n");
      exit(1);
   }
   if (NULL == pending) {
      fprintf(stderr, "Pending Call Null\n");
      exit(1);
   }
   dbus_connection_flush(conn);
  
   printf("Request Sent\n");
  
   // free message
   dbus_message_unref(msg);
  
   // block until we recieve a reply
   dbus_pending_call_block(pending);

   // get the reply message
   msg = dbus_pending_call_steal_reply(pending);
   if (NULL == msg) {
      fprintf(stderr, "Reply Null\n");
      exit(1);
   }
   // free the pending message handle
   dbus_pending_call_unref(pending);

   // read the parameters
   if (!dbus_message_iter_init(msg, &args))
      fprintf(stderr, "Message has no arguments!\n");
   else if (DBUS_TYPE_BOOLEAN != dbus_message_iter_get_arg_type(&args))
      fprintf(stderr, "Argument is not boolean!\n");
   else
      dbus_message_iter_get_basic(&args, &stat);

   if (!dbus_message_iter_next(&args))
      fprintf(stderr, "Message has too few arguments!\n");
   else if (DBUS_TYPE_UINT32 != dbus_message_iter_get_arg_type(&args))
      fprintf(stderr, "Argument is not int!\n");
   else
      dbus_message_iter_get_basic(&args, &level);

   printf("Got Reply: %d, %d\n", stat, level);
  
   // free reply and close connection
   dbus_message_unref(msg);  
   dbus_connection_close(conn);
}

void reply_to_method_call(DBusMessage* msg, DBusConnection* conn)
{
   DBusMessage* reply;
   DBusMessageIter args;
   bool stat = true;
   dbus_uint32_t level = 21614;
   dbus_uint32_t serial = 0;
   char* param = "";

   // read the arguments
   if (!dbus_message_iter_init(msg, &args))
      fprintf(stderr, "Message has no arguments!\n");
   else if (DBUS_TYPE_STRING != dbus_message_iter_get_arg_type(&args))
      fprintf(stderr, "Argument is not string!\n");
   else
      dbus_message_iter_get_basic(&args, &param);

   printf("Method called with %s\n", param);

   // create a reply from the message
   reply = dbus_message_new_method_return(msg);

   // add the arguments to the reply
   dbus_message_iter_init_append(reply, &args);
   if (!dbus_message_iter_append_basic(&args, DBUS_TYPE_BOOLEAN, &stat)) {
      fprintf(stderr, "Out Of Memory!\n");
      exit(1);
   }
   if (!dbus_message_iter_append_basic(&args, DBUS_TYPE_UINT32, &level)) {
      fprintf(stderr, "Out Of Memory!\n");
      exit(1);
   }

   // send the reply && flush the connection
   if (!dbus_connection_send(conn, reply, &serial)) {
      fprintf(stderr, "Out Of Memory!\n");
      exit(1);
   }
   dbus_connection_flush(conn);

   // free the reply
   dbus_message_unref(reply);
}

/**
 * Server that exposes a method call and waits for it to be called
 */
void listen()
{
   DBusMessage* msg;
   DBusMessage* reply;
   DBusMessageIter args;
   DBusConnection* conn;
   DBusError err;
   int ret;
   char* param;

   printf("Listening for method calls\n");

   // initialise the error
   dbus_error_init(&err);
  
   // connect to the bus and check for errors
   conn = dbus_bus_get(DBUS_BUS_SYSTEM, &err);
   if (dbus_error_is_set(&err)) {
      fprintf(stderr, "Connection Error (%s)\n", err.message);
      dbus_error_free(&err);
   }
   if (NULL == conn) {
      fprintf(stderr, "Connection Null\n");
      exit(1);
   }
  
   // request our name on the bus and check for errors
   ret = dbus_bus_request_name(conn, "test.method.server", DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
   if (dbus_error_is_set(&err)) {
      fprintf(stderr, "Name Error (%s)\n", err.message);
      dbus_error_free(&err);
   }
   if (DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER != ret) {
      fprintf(stderr, "Not Primary Owner (%d)\n", ret);
      exit(1);
   }

   // loop, testing for new messages
   while (true) {
      // non blocking read of the next available message
      dbus_connection_read_write(conn, 0);
      msg = dbus_connection_pop_message(conn);

      // loop again if we haven't got a message
      if (NULL == msg) {
         sleep(1);
         continue;
      }
     
      // check this is a method call for the right interface & method
      if (dbus_message_is_method_call(msg, "test.method.Type", "Method"))
         reply_to_method_call(msg, conn);

      // free the message
      dbus_message_unref(msg);
   }

   // close the connection
   dbus_connection_close(conn);
}

/**
 * Listens for signals on the bus
 */
void receive()
{
   DBusMessage* msg;
   DBusMessageIter args;
   DBusConnection* conn;
   DBusError err;
   int ret;
   char* sigvalue;

   printf("Listening for signals\n");

   // initialise the errors
   dbus_error_init(&err);
  
   // connect to the bus and check for errors
   conn = dbus_bus_get(DBUS_BUS_SYSTEM, &err);
   if (dbus_error_is_set(&err)) {
      fprintf(stderr, "Connection Error (%s)\n", err.message);
      dbus_error_free(&err);
   }
   if (NULL == conn) {
      exit(1);
   }
  
   // request our name on the bus and check for errors
   ret = dbus_bus_request_name(conn, "test.signal.sink", DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
   if (dbus_error_is_set(&err)) {
      fprintf(stderr, "Name Error (%s)\n", err.message);
      dbus_error_free(&err);
   }
   if (DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER != ret) {
      exit(1);
   }

   // add a rule for which messages we want to see
   dbus_bus_add_match(conn, "type='signal',interface='test.signal.Type'", &err); // see signals from the given interface
   dbus_connection_flush(conn);
   if (dbus_error_is_set(&err)) {
      fprintf(stderr, "Match Error (%s)\n", err.message);
      exit(1);
   }
   printf("Match rule sent\n");

   // loop listening for signals being emmitted
   while (true) {

      // non blocking read of the next available message
      dbus_connection_read_write(conn, 0);
      msg = dbus_connection_pop_message(conn);

      // loop again if we haven't read a message
      if (NULL == msg) {
         sleep(1);
         continue;
      }

      // check if the message is a signal from the correct interface and with the correct name
      if (dbus_message_is_signal(msg, "test.signal.Type", "Test")) {
        
         // read the parameters
         if (!dbus_message_iter_init(msg, &args))
            fprintf(stderr, "Message Has No Parameters\n");
         else if (DBUS_TYPE_STRING != dbus_message_iter_get_arg_type(&args))
            fprintf(stderr, "Argument is not string!\n");
         else
            dbus_message_iter_get_basic(&args, &sigvalue);
        
         printf("Got Signal with value %s\n", sigvalue);
      }

      // free the message
      dbus_message_unref(msg);
   }
   // close the connection
   dbus_connection_close(conn);
}

int main(int argc, char** argv)
{
   if (2 > argc) {
      printf ("Syntax: dbus-example [send|receive|listen|query] [<param>]\n");
      return 1;
   }
   char* param = "no param";
   if (3 >= argc && NULL != argv[2]) param = argv[2];
   if (0 == strcmp(argv[1], "send"))
      sendsignal(param);
   else if (0 == strcmp(argv[1], "receive"))
      receive();
   else if (0 == strcmp(argv[1], "listen"))
      listen();
   else if (0 == strcmp(argv[1], "query"))
      query(param);
   else {
      printf ("Syntax: dbus-example [send|receive|listen|query] [<param>]\n");
      return 1;
   }
   return 0;
}





If you get following Error


Listening for method calls
Name Error (Connection ":1.48" is not allowed to own the service "test.method.server" due to security policies in the configuration file)
Not Primary Owner (-1)





How to solve.

i have gone thru the "/etc/dbus-1/system.conf" and found these lines
<!-- Config files are placed here that among other things, punch
holes in the above policy for specific services. -->
<includedir>system.d</includedir>

<!-- This is included last so local configuration can override what's
in this standard file -->
<include ignore_missing="yes">system-local.conf</include>

::remove this line from config file
<include ignore_missing="yes">system-local.conf</include>



Then create
/etc/dbus-1/system.d/system-local.conf

<!-- This configuration file specifies the required security policies
     for Bluetooth core daemon to work. -->

<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN"
 "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
<busconfig>

  <!-- ../system.conf have denied everything, so we just punch some holes -->

<policy context="default">
<!-- Deny everything then punch holes -->
<allow send_interface="*"/>
<allow receive_interface="*"/>
<allow own="*"/>
<!-- But allow all users to connect -->
<allow user="*"/>
<!-- Allow anyone to talk to the message bus -->
<!-- FIXME I think currently these allow rules are always implicit
even if they aren't in here -->
<allow send_destination="org.freedesktop.DBus"/>
<allow receive_sender="org.freedesktop.DBus"/>
<!-- valid replies are always allowed -->
<allow send_requested_reply="true"/>
<allow receive_requested_reply="true"/>
</policy>

</busconfig>


Now should not receive above error :-D