Many programming languages have the concept of a pipeline operator that looks a little like this |>. If you’re not familiar with them you’ll more than likely at least be familiar with the pipe | operator in bash for sending data between commands. There’s even a proposal to add the pipeline operator to JS.

But I like writing Ruby. Ruby had a proposed pipeline operator and much was written about it including this great post by Brandon Weaver. The feature was shelved and will likely never be brought back up again, but…

Because we’re writing Ruby in the powerful land of everyone’s favorite language to write expressive DSLs I decided to implement my own and bastardize the bitwise OR operator for my own purposes.

By the end of this blog post we’ll have a working implementation that will allow this code to be valid.

string_sym = pipe(:foo) | Foo('bar') | Wat
puts string_sym

num = pipe(2.0) | Times(2) | Unwrap
puts num

a = pipe(Alphabet) | Reverse | Upcase
puts a

Now that may look silly, but that’s only because it is. So let’s get to the implementation.

Let’s start by not clobbering the bitwise OR operator outright and only use it on values we have deemed should be piped into something else.

Overriding The Pipe Operator

def pipe(input)
  input.define_singleton_method(:|) do |filter|
    pipe filter.call(input)
  end

  input
end

With this we’ll only redefine the pipeline operator on objects we call with pipe(). Let’s try it.

alphabet = ('a'..'z').to_a

def reverse(arr)
  arr.reverse
end

pipe(alphabet) | reverse

This is going to fail with 'reverse': wrong number of arguments (given 0, expected 1) (ArgumentError). We’re actually calling the reverse method before our pipeline manages to pass itself in as an argument. An easy way around this is to wrap the reverse part of the pipeline as a proc or convert it to one using method(:reverse).

alphabet = ('a'..'z').to_a

def reverse(arr)
  arr.reverse
end

pipe(alphabet) | method(:reverse)

will return a reversed array of the alphabet! We have our first working pipeline in Ruby! Now let’s start attempting to pipe some more types.

def times(i, n)
  n * i
end
pipe(2.0) | method(:times)

Immediate Values

Hmm, it seems can’t use define_singleton_method on a Float. “Immediate values” as it were, described by Matz cannot have singleton methods defined on them at runtime. So we’ll need some more magic to make them pipeable. Why don’t we try wrapping the values into something more palatable.

Let’s try implementing a few new methods to go along with pipe.

def wrap_immediate_value(input)
  pipe_method_wrapper = OpenStruct.new(value: input)

  pipe_method_wrapper.define_singleton_method(:|) do |filter|
    pipe filter.call(input)
  end

  pipe_method_wrapper
end

def define_pipe(input)
  input.define_singleton_method(:|) do |filter|
    pipe filter.call(input)
  end

  input
end

def wrap?(type)
  type.is_a?(Numeric) || type.is_a?(Symbol)
end

def pipe(input)
  return wrap_immediate_value(input) if wrap?(input)

  define_pipe(input)
end

def times(i,n)
  n * i
end

pipe(2.0) | method(:times)

Awesome! We have a working pipeline with immediate values, however the astute reader may have noticed the arity of the times method is actually two, when in fact we’re only calling it with 1 in our pipeline. When running our newly wrapped values we get 'times': wrong number of arguments (given 1, expected 2). Now we need to get around this pesky method/proc conversion problem and handle currying. Let’s also go ahead and start containing this code properly. The easiest way around this is to write code in a much more unconventional way so that we can easily manipulate the blocks of code we’re pipelining via method_missing. If we instead write our methods as constants on a module we can redirect calls to them.

module Main
  include Pipe

  Times = -> (i, n) { n * i }

  def self.run
    num = pipe(2.0) | Times(2)
    puts num
  end
end

First we begin by fetching the const off our module, calling it if all parameters are fulfilled, or currying what’s left.

def method_missing(method, *args, &block)
  code = self.const_get(method)

  if code.arity.zero?
    code.call
  elsif code.arity == args.length
    code.call(*args)
  else
    curry(code, args)
  end
end

We can curry quite simply given our code is already in block form and we’re being handed the arguments.

def curry(code, args)
  -> (input) {
    if code.arity == 1
      code.call(input)
    else
      code.call(input, *args)
    end
  }
end

Now bring it all together.

require 'ostruct'

module Pipe
  module ClassMethods
    def wrap_immediate_value(input)
      pipe_method_wrapper = OpenStruct.new(value: input)

      pipe_method_wrapper.define_singleton_method(:|) do |filter|
        Pipe.pipe filter.call(input)
      end

      pipe_method_wrapper
    end

    def define_pipe(input)
      input.define_singleton_method(:|) do |filter|
        Pipe.pipe filter.call(input)
      end

      input
    end

    def wrap?(type)
      type.is_a?(Numeric) || type.is_a?(Symbol)
    end

    def pipe(input)
      return wrap_immediate_value(input) if wrap?(input)

      define_pipe(input)
    end

    def curry(code, args)
      -> (input) {
        if code.arity == 1
          code.call(input)
        else
          code.call(input, *args)
        end
      }
    end

    def method_missing(method, *args, &block)
      code = self.const_get(method)

      if code.arity.zero?
        code.call
      elsif code.arity == args.length
        code.call(*args)
      else
        curry(code, args)
      end
    end
  end

  extend ClassMethods
  def self.included(other)
    other.extend(ClassMethods)
  end
end

Finally this will let us write code that can execute like so.

module Main
  include Pipe

  Times = -> (i, n) { n * i }

  def self.run
    num = pipe(2.0) | Times(2)
    puts num
  end
end

Almost there! If you ran the code above you’ll notice we don’t actually get 4.0 at the end of the pipeline! We instead get #<OpenStruct value=4.0>. That’s because we never unwrapped the immediate value and are simply returning the OpenStruct. We can solve this by creating a noop Unwrap function that’s explictly checked when defining the singleton method.

module Pipe
  Noop = -> {}
  Unwrap = Noop

  module ClassMethods
    def wrap_immediate_value(input)
      pipe_method_wrapper = OpenStruct.new(value: input)

      pipe_method_wrapper.define_singleton_method(:|) do |filter|
        if Unwrap.object_id == filter.object_id
          input
        else
          Pipe.pipe filter.call(input)
        end
      end

      pipe_method_wrapper
    end
  end
end

Now finally the pipeline would look like this

module Main
  include Pipe

  Times = -> (i, n) { n * i }

  def self.run
    num = pipe(2.0) | Times(2) | Unwrap
    puts num
  end
end

Now we can write full fledged pipelines! Here’s an example of a module with several pipelines covering lots of cases.

module Main
  include Pipe

  Alphabet = ('a'..'z').to_a

  Reverse = -> (arr) { arr.reverse }
  Upcase  = -> (arr) { arr.map(&:upcase) }

  Times = -> (i, n) { n * i }

  Foo = -> (foo, bar) { "#{foo} #{bar}" }

  Wat = -> (omg) { "wtf bbq #{omg}" }

  Join = -> (arr) { arr.join }

  def self.run
    string_sym = pipe(:foo) | Foo('bar') | Wat
    puts string_sym

    num = pipe(2.0) | Times(2) | Unwrap
    puts num

    a = pipe(Alphabet) | Reverse | Upcase | Join
    puts a
  end
end

Main::run()

Running the above should output:

wtf bbq foo bar
4.0
ZYXWVUTSRQPONMLKJIHGFEDCBA

If you made it this far, congratulations! We’ve successfully, albeit painfully, implemented pipelines in Ruby. This truly is a testament to how moldable Ruby’s syntax is. I hope you enjoyed this adventure and learned something new!