[autotest] Add -m option to gs_offloader

gs_offloader used -m option by default when running gsutil.
Make -m an option of gs_offloader, when it is enabled, then gs_offloader
uses -m option for gsutil. Otherwise, run gsutil without -m.

BUG=chromium:513467
TEST=Run gs_offloader in local machine and see gsutil is run
with/without -m option.
site_utils/gs_offloader_unittest.py
DEPLOY=gs_offloader,gs_offloader_s

Change-Id: If4db154f5c5e9649c032be6236eececec363e7c0
Reviewed-on: https://chromium-review.googlesource.com/292811
Trybot-Ready: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
Reviewed-by: Mungyung Ryu <mkryu@google.com>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
diff --git a/site_utils/gs_offloader.py b/site_utils/gs_offloader.py
index 1356f9e..ed889e4 100755
--- a/site_utils/gs_offloader.py
+++ b/site_utils/gs_offloader.py
@@ -113,9 +113,10 @@
   raise TimeoutException('Process Timed Out')
 
 
-def get_cmd_list(dir_entry, gs_path):
+def get_cmd_list(multiprocessing, dir_entry, gs_path):
   """Return the command to offload a specified directory.
 
+  @param multiprocessing: True to turn on -m option for gsutil.
   @param dir_entry: Directory entry/path that which we need a cmd_list to
                     offload.
   @param gs_path: Location in google storage where we will
@@ -124,10 +125,17 @@
   @return: A command list to be executed by Popen.
 
   """
+  cmd = ['gsutil']
+  if multiprocessing:
+      cmd.append('-m')
   if USE_RSYNC_ENABLED:
-    return ['gsutil', '-m', 'rsync', '-eR',
-            dir_entry, os.path.join(gs_path, os.path.basename(dir_entry))]
-  return ['gsutil', '-m', 'cp', '-eR', dir_entry, gs_path]
+      cmd.append('rsync')
+      target = os.path.join(gs_path, os.path.basename(dir_entry))
+  else:
+      cmd.append('cp')
+      target = gs_path
+  cmd += ['-eR', dir_entry, target]
+  return cmd
 
 
 def get_directory_size_kibibytes_cmd_list(directory):
@@ -213,10 +221,11 @@
       logging.error('Failed to modify permission for %s: %s', dir_entry, e)
 
 
-def get_offload_dir_func(gs_uri):
+def get_offload_dir_func(gs_uri, multiprocessing):
   """Returns the offload directory function for the given gs_uri
 
   @param gs_uri: Google storage bucket uri to offload to.
+  @param multiprocessing: True to turn on -m option for gsutil.
 
   @returns offload_dir function to preform the offload.
   """
@@ -241,8 +250,9 @@
       process = None
       signal.alarm(OFFLOAD_TIMEOUT_SECS)
       gs_path = '%s%s' % (gs_uri, dest_path)
-      process = subprocess.Popen(get_cmd_list(dir_entry, gs_path),
-                                 stdout=stdout_file, stderr=stderr_file)
+      process = subprocess.Popen(
+          get_cmd_list(multiprocessing, dir_entry, gs_path),
+          stdout=stdout_file, stderr=stderr_file)
       process.wait()
       signal.alarm(0)
 
@@ -337,7 +347,7 @@
   # TODO (sbasi) - Try to use the gsutil command to check write access.
   # Ensure we have write access to gs_uri.
   dummy_file = tempfile.NamedTemporaryFile()
-  test_cmd = get_cmd_list(dummy_file.name, gs_uri)
+  test_cmd = get_cmd_list(False, dummy_file.name, gs_uri)
   while True:
     try:
       subprocess.check_call(test_cmd)
@@ -375,7 +385,8 @@
     else:
       self.gs_uri = utils.get_offload_gsuri()
       logging.debug('Offloading to: %s', self.gs_uri)
-      self._offload_func = get_offload_dir_func(self.gs_uri)
+      self._offload_func = get_offload_dir_func(
+          self.gs_uri, options.multiprocessing)
     classlist = []
     if options.process_hosts_only or options.process_all:
       classlist.append(job_directories.SpecialJobDirectory)
@@ -502,6 +513,8 @@
   parser.add_option('-l', '--log_size', dest='log_size',
                     help='Limit the offloader logs to a specified number of '
                          'Mega Bytes.', type='int', default=0)
+  parser.add_option('-m', dest='multiprocessing', action='store_true',
+                    help='Turn on -m option for gsutil.', default=False)
   options = parser.parse_args()[0]
   if options.process_all and options.process_hosts_only:
     parser.print_help()
diff --git a/site_utils/gs_offloader_unittest.py b/site_utils/gs_offloader_unittest.py
index 811af09..5f0883d 100644
--- a/site_utils/gs_offloader_unittest.py
+++ b/site_utils/gs_offloader_unittest.py
@@ -72,9 +72,9 @@
         else:
             expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI
         utils.get_offload_gsuri().AndReturn(expected_gsuri)
-        offload_func = gs_offloader.get_offload_dir_func(expected_gsuri)
+        offload_func = gs_offloader.get_offload_dir_func(expected_gsuri, False)
         self.mox.StubOutWithMock(gs_offloader, 'get_offload_dir_func')
-        gs_offloader.get_offload_dir_func(expected_gsuri).AndReturn(
+        gs_offloader.get_offload_dir_func(expected_gsuri, False).AndReturn(
                 offload_func)
         self.mox.ReplayAll()
         return offload_func
@@ -296,11 +296,12 @@
 class CommandListTests(unittest.TestCase):
     """Tests for `get_cmd_list()`."""
 
-    def _command_list_assertions(self, job, use_rsync=True):
+    def _command_list_assertions(self, job, use_rsync=True, multi=False):
         """Call `get_cmd_list()` and check the return value.
 
         Check the following assertions:
           * The command name (argv[0]) is 'gsutil'.
+          * '-m' option (argv[1]) is on when the argument, multi, is True.
           * The arguments contain the 'cp' subcommand.
           * The next-to-last argument (the source directory) is the
             job's `queue_args[0]`.
@@ -309,6 +310,8 @@
 
         @param job A job with properly calculated arguments to
                    `get_cmd_list()`
+        @param use_rsync True when using 'rsync'. False when using 'cp'.
+        @param multi True when using '-m' option for gsutil.
 
         """
         test_bucket_uri = 'gs://a-test-bucket'
@@ -316,10 +319,12 @@
         gs_offloader.USE_RSYNC_ENABLED = use_rsync
 
         command = gs_offloader.get_cmd_list(
-            job.queue_args[0],
-            os.path.join(test_bucket_uri, job.queue_args[1]))
+                multi, job.queue_args[0],
+                os.path.join(test_bucket_uri, job.queue_args[1]))
 
         self.assertEqual(command[0], 'gsutil')
+        if multi:
+            self.assertEqual(command[1], '-m')
         self.assertEqual(command[-2], job.queue_args[0])
 
         if use_rsync:
@@ -356,6 +361,18 @@
         self._command_list_assertions(job, use_rsync=False)
 
 
+    def test_get_cmd_list_regular_multi(self):
+        """Test `get_cmd_list()` as for a regular job with True multi."""
+        job = _MockJobDirectory('118-debug')
+        self._command_list_assertions(job, multi=True)
+
+
+    def test_get_cmd_list_special_multi(self):
+        """Test `get_cmd_list()` as for a special job with True multi."""
+        job = _MockJobDirectory('hosts/host1/118-reset')
+        self._command_list_assertions(job, multi=True)
+
+
 # Below is partial sample of e-mail notification text.  This text is
 # deliberately hard-coded and then parsed to create the test data;
 # the idea is to make sure the actual text format will be reviewed
@@ -697,9 +714,9 @@
         signal.alarm(gs_offloader.OFFLOAD_TIMEOUT_SECS)
         command.append(queue_args[0])
         gs_offloader.get_cmd_list(
-                queue_args[0], '%s%s' % (utils.DEFAULT_OFFLOAD_GSURI,
-                                         queue_args[1])).AndReturn(
-                        command)
+                False, queue_args[0],
+                '%s%s' % (utils.DEFAULT_OFFLOAD_GSURI,
+                          queue_args[1])).AndReturn(command)
         signal.alarm(0)
         signal.alarm(0)
 
@@ -716,8 +733,9 @@
         """
         self.mox.ReplayAll()
         gs_offloader.get_offload_dir_func(
-                utils.DEFAULT_OFFLOAD_GSURI)(self._job.queue_args[0],
-                                     self._job.queue_args[1])
+                utils.DEFAULT_OFFLOAD_GSURI, False)(
+                        self._job.queue_args[0],
+                        self._job.queue_args[1])
         self.mox.VerifyAll()
         self.assertEqual(not should_succeed,
                          os.path.isdir(self._job.queue_args[0]))
@@ -759,7 +777,7 @@
         """
         signal.alarm(gs_offloader.OFFLOAD_TIMEOUT_SECS)
         gs_offloader.get_cmd_list(
-                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
+                False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
                         ['test', '-d', self._job.queue_args[0]])
         signal.alarm(0).AndRaise(
                 gs_offloader.TimeoutException('fubar'))