OSDN Git Service

* gnu/java/nio/SelectorImpl.java
authormembar <membar@138bc75d-0d04-0410-961f-82ee72b054a4>
Sat, 20 Dec 2003 15:33:24 +0000 (15:33 +0000)
committermembar <membar@138bc75d-0d04-0410-961f-82ee72b054a4>
Sat, 20 Dec 2003 15:33:24 +0000 (15:33 +0000)
(selectThreadMutex): New field.
(selectThread): New field.
(unhandledWakeup): New field.
(implCloseSelector): Added skeleton code which
synchronizes as per Sun JRE JavaDoc.
(keys): Throw ClosedSelectorException if selector
is closed.
(selectNow): Added comment that we're faking out
an immediate select with a one-microsecond-timeout one.
(select): Use 0 instead of -1 for infinite timeout.
(implSelect): Changed comment in declaration.
(select): Added synchronized to method declaration.
Added synchronization and wakeup support as per Sun
JRE JavaDoc.
(selectedKeys): Throw ClosedSelectorException if selector
is closed.
(wakeup): Implemented.
(deregisterCancelledKeys): Synchronize on cancelled key
set before deregistering.
(register): Synchronize on key set before registering.
* java/nio/channels/spi/AbstractSelector.java
Added import for java.nio.channels.ClosedSelectorException.
(close): Added synchronized to method declaration.
(cancelledKeys): Throw ClosedSelectorException if selector
is closed.
(cancelKey): Synchronize on cancelled key set before key.

git-svn-id: svn+ssh://gcc.gnu.org/svn/gcc/trunk@74879 138bc75d-0d04-0410-961f-82ee72b054a4

libjava/ChangeLog
libjava/gnu/java/nio/SelectorImpl.java
libjava/java/nio/channels/spi/AbstractSelector.java

index b07307d..fb5bf5e 100644 (file)
@@ -1,3 +1,33 @@
+2003-12-20  Mohan Embar  <gnustuff@thisiscool.com>
+
+       * gnu/java/nio/SelectorImpl.java
+       (selectThreadMutex): New field.
+       (selectThread): New field.
+       (unhandledWakeup): New field.
+       (implCloseSelector): Added skeleton code which
+       synchronizes as per Sun JRE JavaDoc.
+       (keys): Throw ClosedSelectorException if selector
+       is closed.
+       (selectNow): Added comment that we're faking out
+       an immediate select with a one-microsecond-timeout one.
+       (select): Use 0 instead of -1 for infinite timeout.
+       (implSelect): Changed comment in declaration.
+       (select): Added synchronized to method declaration.
+       Added synchronization and wakeup support as per Sun
+       JRE JavaDoc.
+       (selectedKeys): Throw ClosedSelectorException if selector
+       is closed.
+       (wakeup): Implemented.
+       (deregisterCancelledKeys): Synchronize on cancelled key
+       set before deregistering.
+       (register): Synchronize on key set before registering.
+       * java/nio/channels/spi/AbstractSelector.java
+       Added import for java.nio.channels.ClosedSelectorException.
+       (close): Added synchronized to method declaration.
+       (cancelledKeys): Throw ClosedSelectorException if selector
+       is closed.
+       (cancelKey): Synchronize on cancelled key set before key.
+
 2003-12-20  Michael Koch  <konqueror@gmx.de>
 
        * Makefile.am (ordinary_java_source_files):
index 180c7ee..f26e080 100644 (file)
@@ -65,6 +65,28 @@ public class SelectorImpl extends AbstractSelector
   private Set keys;
   private Set selected;
 
+  /**
+   * A dummy object whose monitor regulates access to both our
+   * selectThread and unhandledWakeup fields.
+   */
+  private Object selectThreadMutex = new Object ();
+  
+  /**
+   * Any thread that's currently blocked in a select operation.
+   */
+  private Thread selectThread;
+  
+  /**
+   * Indicates whether we have an unhandled wakeup call. This can
+   * be due to either wakeup() triggering a thread interruption while
+   * a thread was blocked in a select operation (in which case we need
+   * to reset this thread's interrupt status after interrupting the
+   * select), or else that no thread was on a select operation at the
+   * time that wakeup() was called, in which case the following select()
+   * operation should return immediately with nothing selected.
+   */
+  private boolean unhandledWakeup;
+
   public SelectorImpl (SelectorProvider provider)
   {
     super (provider);
@@ -81,28 +103,44 @@ public class SelectorImpl extends AbstractSelector
   protected final void implCloseSelector()
     throws IOException
   {
-    // FIXME: We surely need to do more here.
+    // Cancel any pending select operation.
     wakeup();
+    
+    synchronized (keys)
+      {
+        synchronized (selected)
+          {
+            synchronized (cancelledKeys ())
+              {
+                // FIXME: Release resources here.
+              }
+          }
+      }
   }
 
   public final Set keys()
   {
+    if (!isOpen())
+      throw new ClosedSelectorException();
+
     return Collections.unmodifiableSet (keys);
   }
     
   public final int selectNow()
     throws IOException
   {
+    // FIXME: We're simulating an immediate select
+    // via a select with a timeout of one millisecond.
     return select (1);
   }
 
   public final int select()
     throws IOException
   {
-    return select (-1);
+    return select (0);
   }
 
-  // A timeout value of -1 means block forever.
+  // A timeout value of 0 means block forever.
   private static native int implSelect (int[] read, int[] write,
                                         int[] except, long timeout)
     throws IOException;
@@ -144,112 +182,199 @@ public class SelectorImpl extends AbstractSelector
     return result;
   }
 
-  public int select (long timeout)
+  public synchronized int select (long timeout)
     throws IOException
   {
     if (!isOpen())
-      throw new ClosedSelectorException ();
-
-    if (keys == null)
-           {
-        return 0;
-           }
-
-    deregisterCancelledKeys();
-
-    // Set only keys with the needed interest ops into the arrays.
-    int[] read = getFDsAsArray (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT);
-    int[] write = getFDsAsArray (SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
-    int[] except = new int [0]; // FIXME: We dont need to check this yet
-    int anzahl = read.length + write.length + except.length;
-
-    // Call the native select() on all file descriptors.
-    begin();
-    int result = implSelect (read, write, except, timeout);
-    end();
-
-    Iterator it = keys.iterator ();
-
-    while (it.hasNext ())
+      throw new ClosedSelectorException();
+      
+    synchronized (keys)
       {
-        int ops = 0;
-        SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
-
-        // If key is already selected retrieve old ready ops.
-        if (selected.contains (key))
+        synchronized (selected)
           {
-            ops = key.readyOps ();
-          }
-
-        // Set new ready read/accept ops
-        for (int i = 0; i < read.length; i++)
-          {
-            if (key.getNativeFD() == read[i])
+            deregisterCancelledKeys();
+
+            // Set only keys with the needed interest ops into the arrays.
+            int[] read = getFDsAsArray (SelectionKey.OP_READ
+                                        | SelectionKey.OP_ACCEPT);
+            int[] write = getFDsAsArray (SelectionKey.OP_WRITE
+                                         | SelectionKey.OP_CONNECT);
+
+            // FIXME: We dont need to check this yet
+            int[] except = new int [0];
+
+            // Test to see if we've got an unhandled wakeup call,
+            // in which case we return immediately. Otherwise,
+            // remember our current thread and jump into the select.
+            // The monitor for dummy object selectThreadMutex regulates
+            // access to these fields.
+
+            // FIXME: Not sure from the spec at what point we should
+            // return "immediately". Is it here or immediately upon
+            // entry to this function?
+            
+            // NOTE: There's a possibility of another thread calling
+            // wakeup() immediately after our thread releases
+            // selectThreadMutex's monitor here, in which case we'll
+            // do the select anyway. Since calls to wakeup() and select()
+            // among different threads happen in non-deterministic order,
+            // I don't think this is an issue.
+            synchronized (selectThreadMutex)
               {
-                if (key.channel () instanceof ServerSocketChannelImpl)
+                if (unhandledWakeup)
                   {
-                    ops = ops | SelectionKey.OP_ACCEPT;
+                    unhandledWakeup = false;
+                    return 0;
                   }
                 else
                   {
-                    ops = ops | SelectionKey.OP_READ;
+                    selectThread = Thread.currentThread ();
                   }
               }
-          }
 
-        // Set new ready write ops
-        for (int i = 0; i < write.length; i++)
-          {
-            if (key.getNativeFD() == write[i])
+            // Call the native select() on all file descriptors.
+            int result = 0;
+            try
               {
-                ops = ops | SelectionKey.OP_WRITE;
-                
-//                 if (key.channel ().isConnected ())
-//                   {
-//                     ops = ops | SelectionKey.OP_WRITE;
-//                   }
-//                 else
-//                   {
-//                     ops = ops | SelectionKey.OP_CONNECT;
-//                   }
-             }
-          }
+                begin();
+                result = implSelect (read, write, except, timeout);
+              }
+            finally
+              {
+                end();
+              }
 
-        // FIXME: We dont handle exceptional file descriptors yet.
+            // If our unhandled wakeup flag is set at this point,
+            // reset our thread's interrupt flag because we were
+            // awakened by wakeup() instead of an external thread
+            // interruption.
+            //
+            // NOTE: If we were blocked in a select() and one thread
+            // called Thread.interrupt() on the blocked thread followed
+            // by another thread calling Selector.wakeup(), then race
+            // conditions could make it so that the thread's interrupt
+            // flag is reset even though the Thread.interrupt() call
+            // "was there first". I don't think we need to care about
+            // this scenario.
+            synchronized (selectThreadMutex)
+              {
+                if (unhandledWakeup)
+                  {
+                    unhandledWakeup = false;
+                    selectThread.interrupted ();
+                  }
+                selectThread = null;
+              }
 
-        // If key is not yet selected add it.
-        if (!selected.contains (key))
-          {
-            selected.add (key);
-          }
+            Iterator it = keys.iterator ();
 
-        // Set new ready ops
-        key.readyOps (key.interestOps () & ops);
-      }
+            while (it.hasNext ())
+              {
+                int ops = 0;
+                SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
 
-    deregisterCancelledKeys();
-    return result;
+                // If key is already selected retrieve old ready ops.
+                if (selected.contains (key))
+                  {
+                    ops = key.readyOps ();
+                  }
+
+                // Set new ready read/accept ops
+                for (int i = 0; i < read.length; i++)
+                  {
+                    if (key.getNativeFD() == read[i])
+                      {
+                        if (key.channel () instanceof ServerSocketChannelImpl)
+                          {
+                            ops = ops | SelectionKey.OP_ACCEPT;
+                          }
+                        else
+                          {
+                            ops = ops | SelectionKey.OP_READ;
+                          }
+                      }
+                  }
+
+                // Set new ready write ops
+                for (int i = 0; i < write.length; i++)
+                  {
+                    if (key.getNativeFD() == write[i])
+                      {
+                        ops = ops | SelectionKey.OP_WRITE;
+
+        //                 if (key.channel ().isConnected ())
+        //                   {
+        //                     ops = ops | SelectionKey.OP_WRITE;
+        //                   }
+        //                 else
+        //                   {
+        //                     ops = ops | SelectionKey.OP_CONNECT;
+        //                   }
+                     }
+                  }
+
+                // FIXME: We dont handle exceptional file descriptors yet.
+
+                // If key is not yet selected add it.
+                if (!selected.contains (key))
+                  {
+                    selected.add (key);
+                  }
+
+                // Set new ready ops
+                key.readyOps (key.interestOps () & ops);
+              }
+            deregisterCancelledKeys();
+            
+            return result;
+          }
+        }
   }
     
   public final Set selectedKeys()
   {
+    if (!isOpen())
+      throw new ClosedSelectorException();
+
     return selected;
   }
 
   public final Selector wakeup()
   {
-    return null;
+    // IMPLEMENTATION NOTE: Whereas the specification says that
+    // thread interruption should trigger a call to wakeup, we
+    // do the reverse under the covers: wakeup triggers a thread
+    // interrupt followed by a subsequent reset of the thread's
+    // interrupt status within select().
+    
+    // First, acquire the monitor of the object regulating
+    // access to our selectThread and unhandledWakeup fields.
+    synchronized (selectThreadMutex)
+      {
+        unhandledWakeup = true;
+        
+        // Interrupt any thread which is currently blocked in
+        // a select operation.
+        if (selectThread != null)
+          selectThread.interrupt ();
+      }
+      
+    return this;
   }
 
   private final void deregisterCancelledKeys()
   {
-    Iterator it = cancelledKeys().iterator();
-
-    while (it.hasNext ())
-      {
-        keys.remove ((SelectionKeyImpl) it.next ());
-        it.remove ();
-      }
+    Set ckeys = cancelledKeys ();
+    synchronized (ckeys)
+    {
+      Iterator it = ckeys.iterator();
+
+      while (it.hasNext ())
+        {
+          keys.remove ((SelectionKeyImpl) it.next ());
+          it.remove ();
+        }
+    }
   }
 
   protected SelectionKey register (SelectableChannel ch, int ops, Object att)
@@ -282,7 +407,11 @@ public class SelectorImpl extends AbstractSelector
         throw new InternalError ("No known channel type");
       }
 
-    keys.add (result);
+    synchronized (keys)
+      {
+        keys.add (result);
+      }
+
     result.interestOps (ops);
     result.attach (att);
     return result;
index 58ce0c8..ca77187 100644 (file)
@@ -39,6 +39,7 @@ exception statement from your version. */
 package java.nio.channels.spi;
 
 import java.io.IOException;
+import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.Set;
@@ -64,7 +65,7 @@ public abstract class AbstractSelector extends Selector
    * 
    * @exception IOException If an error occurs
    */
-  public final void close () throws IOException
+  public final synchronized void close () throws IOException
   {
     if (closed)
       return;
@@ -102,12 +103,18 @@ public abstract class AbstractSelector extends Selector
 
   protected final Set cancelledKeys()
   {
+    if (!isOpen())
+      throw new ClosedSelectorException();
+
     return cancelledKeys;
   }
 
   final void cancelKey (AbstractSelectionKey key)
   {
-    cancelledKeys.remove (key);
+    synchronized (cancelledKeys)
+      {
+        cancelledKeys.remove(key);
+      }
   }
 
   /**