summaryrefslogtreecommitdiff
path: root/ojluni/src/main/java/sun/nio/ch/AsynchronousChannelGroupImpl.java
blob: a584ce9dbf5a143f9703f245060851c5ce404766 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
/*
 * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package sun.nio.ch;

import java.nio.channels.Channel;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.spi.AsynchronousChannelProvider;
import java.io.IOException;
import java.io.FileDescriptor;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.security.PrivilegedAction;
import java.security.AccessController;
import java.security.AccessControlContext;
import sun.security.action.GetIntegerAction;

/**
 * Base implementation of AsynchronousChannelGroup
 */

abstract class AsynchronousChannelGroupImpl
    extends AsynchronousChannelGroup implements Executor
{
    // number of internal threads handling I/O events when using an unbounded
    // thread pool. Internal threads do not dispatch to completion handlers.
    private static final int internalThreadCount = AccessController.doPrivileged(
        new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1));

    // associated thread pool
    private final ThreadPool pool;

    // number of tasks running (including internal)
    private final AtomicInteger threadCount = new AtomicInteger();

    // associated Executor for timeouts
    private ScheduledThreadPoolExecutor timeoutExecutor;

    // task queue for when using a fixed thread pool. In that case, thread
    // waiting on I/O events must be awokon to poll tasks from this queue.
    private final Queue<Runnable> taskQueue;

    // group shutdown
    private final AtomicBoolean shutdown = new AtomicBoolean();
    private final Object shutdownNowLock = new Object();
    private volatile boolean terminateInitiated;

    AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
                                 ThreadPool pool)
    {
        super(provider);
        this.pool = pool;

        if (pool.isFixedThreadPool()) {
            taskQueue = new ConcurrentLinkedQueue<Runnable>();
        } else {
            taskQueue = null;   // not used
        }

        // use default thread factory as thread should not be visible to
        // application (it doesn't execute completion handlers).
        this.timeoutExecutor = (ScheduledThreadPoolExecutor)
            Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());
        this.timeoutExecutor.setRemoveOnCancelPolicy(true);
    }

    final ExecutorService executor() {
        return pool.executor();
    }

    final boolean isFixedThreadPool() {
        return pool.isFixedThreadPool();
    }

    final int fixedThreadCount() {
        if (isFixedThreadPool()) {
            return pool.poolSize();
        } else {
            return pool.poolSize() + internalThreadCount;
        }
    }

    private Runnable bindToGroup(final Runnable task) {
        final AsynchronousChannelGroupImpl thisGroup = this;
        return new Runnable() {
            public void run() {
                Invoker.bindToGroup(thisGroup);
                task.run();
            }
        };
    }

    private void startInternalThread(final Runnable task) {
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            @Override
            public Void run() {
                // internal threads should not be visible to application so
                // cannot use user-supplied thread factory
                ThreadPool.defaultThreadFactory().newThread(task).start();
                return null;
            }
         });
    }

    protected final void startThreads(Runnable task) {
        if (!isFixedThreadPool()) {
            for (int i=0; i<internalThreadCount; i++) {
                startInternalThread(task);
                threadCount.incrementAndGet();
            }
        }
        if (pool.poolSize() > 0) {
            task = bindToGroup(task);
            try {
                for (int i=0; i<pool.poolSize(); i++) {
                    pool.executor().execute(task);
                    threadCount.incrementAndGet();
                }
            } catch (RejectedExecutionException  x) {
                // nothing we can do
            }
        }
    }

    final int threadCount() {
        return threadCount.get();
    }

    /**
     * Invoked by tasks as they terminate
     */
    final int threadExit(Runnable task, boolean replaceMe) {
        if (replaceMe) {
            try {
                if (Invoker.isBoundToAnyGroup()) {
                    // submit new task to replace this thread
                    pool.executor().execute(bindToGroup(task));
                } else {
                    // replace internal thread
                    startInternalThread(task);
                }
                return threadCount.get();
            } catch (RejectedExecutionException x) {
                // unable to replace
            }
        }
        return threadCount.decrementAndGet();
    }

    /**
     * Wakes up a thread waiting for I/O events to execute the given task.
     */
    abstract void executeOnHandlerTask(Runnable task);

    /**
     * For a fixed thread pool the task is queued to a thread waiting on I/O
     * events. For other thread pools we simply submit the task to the thread
     * pool.
     */
    final void executeOnPooledThread(Runnable task) {
        if (isFixedThreadPool()) {
            executeOnHandlerTask(task);
        } else {
            pool.executor().execute(bindToGroup(task));
        }
    }

    final void offerTask(Runnable task) {
        taskQueue.offer(task);
    }

    final Runnable pollTask() {
        return (taskQueue == null) ? null : taskQueue.poll();
    }

    final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) {
        try {
            return timeoutExecutor.schedule(task, timeout, unit);
        } catch (RejectedExecutionException rej) {
            if (terminateInitiated) {
                // no timeout scheduled as group is terminating
                return null;
            }
            throw new AssertionError(rej);
        }
    }

    @Override
    public final boolean isShutdown() {
        return shutdown.get();
    }

    @Override
    public final boolean isTerminated()  {
        return pool.executor().isTerminated();
    }

    /**
     * Returns true if there are no channels in the group
     */
    abstract boolean isEmpty();

    /**
     * Attaches a foreign channel to this group.
     */
    abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)
        throws IOException;

    /**
     * Detaches a foreign channel from this group.
     */
    abstract void detachForeignChannel(Object key);

    /**
     * Closes all channels in the group
     */
    abstract void closeAllChannels() throws IOException;

    /**
     * Shutdown all tasks waiting for I/O events.
     */
    abstract void shutdownHandlerTasks();

    private void shutdownExecutors() {
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            public Void run() {
                pool.executor().shutdown();
                timeoutExecutor.shutdown();
                return null;
            }
        });
    }

    @Override
    public final void shutdown() {
        if (shutdown.getAndSet(true)) {
            // already shutdown
            return;
        }
        // if there are channels in the group then shutdown will continue
        // when the last channel is closed
        if (!isEmpty()) {
            return;
        }
        // initiate termination (acquire shutdownNowLock to ensure that other
        // threads invoking shutdownNow will block).
        synchronized (shutdownNowLock) {
            if (!terminateInitiated) {
                terminateInitiated = true;
                shutdownHandlerTasks();
                shutdownExecutors();
            }
        }
    }

    @Override
    public final void shutdownNow() throws IOException {
        shutdown.set(true);
        synchronized (shutdownNowLock) {
            if (!terminateInitiated) {
                terminateInitiated = true;
                closeAllChannels();
                shutdownHandlerTasks();
                shutdownExecutors();
            }
        }
    }

    /**
     * For use by AsynchronousFileChannel to release resources without shutting
     * down the thread pool.
     */
    final void detachFromThreadPool() {
        if (shutdown.getAndSet(true))
            throw new AssertionError("Already shutdown");
        if (!isEmpty())
            throw new AssertionError("Group not empty");
        shutdownHandlerTasks();
    }

    @Override
    public final boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException
    {
        return pool.executor().awaitTermination(timeout, unit);
    }

    /**
     * Executes the given command on one of the channel group's pooled threads.
     */
    @Override
    public final void execute(Runnable task) {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            // when a security manager is installed then the user's task
            // must be run with the current calling context
            final AccessControlContext acc = AccessController.getContext();
            final Runnable delegate = task;
            task = new Runnable() {
                @Override
                public void run() {
                    AccessController.doPrivileged(new PrivilegedAction<Void>() {
                        @Override
                        public Void run() {
                            delegate.run();
                            return null;
                        }
                    }, acc);
                }
            };
        }
        executeOnPooledThread(task);
    }
}